From 43d03c3df5a630bb929448ed9d30211e101ea958 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 28 Feb 2024 14:22:14 -0800 Subject: [PATCH] fan_out logic Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertingPlugin.kt | 6 +- .../alerting/BucketLevelMonitorRunner.kt | 5 +- .../alerting/DocumentLevelMonitorRunner.kt | 403 ++++-- .../org/opensearch/alerting/MonitorRunner.kt | 4 +- .../alerting/MonitorRunnerExecutionContext.kt | 6 +- .../alerting/MonitorRunnerService.kt | 119 +- .../alerting/QueryLevelMonitorRunner.kt | 4 +- .../action/DocLevelMonitorFanOutAction.kt | 15 + .../action/DocLevelMonitorFanOutRequest.kt | 108 ++ .../action/DocLevelMonitorFanOutResponse.kt | 92 ++ .../model/DocumentLevelTriggerRunResult.kt | 5 +- .../alerting/model/IndexExecutionContext.kt | 57 +- .../alerting/model/TriggerRunResult.kt | 14 +- .../alerting/settings/AlertingSettings.kt | 19 +- .../TransportDocLevelMonitorFanOutAction.kt | 1075 +++++++++++++++++ .../TransportExecuteMonitorAction.kt | 4 +- .../TransportExecuteWorkflowAction.kt | 12 +- .../workflow/CompositeWorkflowRunner.kt | 23 +- .../alerting/workflow/WorkflowRunContext.kt | 44 +- .../alerting/workflow/WorkflowRunner.kt | 4 +- .../alerting/AlertingRestTestCase.kt | 2 +- .../alerting/DocumentMonitorRunnerIT.kt | 34 +- .../alerting/MonitorDataSourcesIT.kt | 3 +- .../alerting/model/WriteableTests.kt | 78 +- core/build.gradle | 1 + .../opensearchapi/OpenSearchExtensions.kt | 15 + 26 files changed, 1975 insertions(+), 177 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutAction.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index e53187c7b..81cd3a690 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting import org.opensearch.action.ActionRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutAction import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteWorkflowAction import org.opensearch.alerting.action.GetDestinationsAction @@ -51,6 +52,7 @@ import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction import org.opensearch.alerting.transport.TransportDeleteMonitorAction import org.opensearch.alerting.transport.TransportDeleteWorkflowAction +import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction import org.opensearch.alerting.transport.TransportExecuteMonitorAction import org.opensearch.alerting.transport.TransportExecuteWorkflowAction import org.opensearch.alerting.transport.TransportGetAlertsAction @@ -211,7 +213,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java), - ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java) + ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java), + ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java) ) } @@ -322,6 +325,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.MAX_ACTION_THROTTLE_VALUE, AlertingSettings.FILTER_BY_BACKEND_ROLES, AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT, + AlertingSettings.DOC_LEVEL_MONITOR_FAN_OUT_NODES, LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT, LegacyOpenDistroAlertingSettings.BULK_TIMEOUT, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index e960b9da5..7694b7ce4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -49,9 +49,11 @@ import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.transport.TransportService import java.time.Instant import java.util.UUID +// TODO raise PR for bucket level monitor optimization also. dont miss object BucketLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) @@ -62,7 +64,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { periodEnd: Instant, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, - executionId: String + executionId: String, + transportService: TransportService? ): MonitorRunResult { val roles = MonitorRunnerService.getRolesForMonitor(monitor) logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index daeb22945..a380b203f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -8,6 +8,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListenerResponseHandler import org.opensearch.action.DocWriteRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse @@ -15,7 +16,11 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.GroupedActionListener import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutAction +import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults @@ -25,8 +30,6 @@ import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED -import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT -import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -35,6 +38,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.node.DiscoveryNode import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.XContentFactory @@ -53,6 +57,8 @@ import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.util.string import org.opensearch.core.action.ActionListener import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.index.shard.ShardId import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder @@ -67,11 +73,17 @@ import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder +import org.opensearch.transport.TransportException +import org.opensearch.transport.TransportRequestOptions +import org.opensearch.transport.TransportService import java.io.IOException import java.time.Instant import java.util.UUID import java.util.concurrent.atomic.AtomicLong import java.util.stream.Collectors +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine import kotlin.math.max object DocumentLevelMonitorRunner : MonitorRunner() { @@ -84,8 +96,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { periodEnd: Instant, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, - executionId: String + executionId: String, + transportService: TransportService?, ): MonitorRunResult { + if (transportService == null) + throw RuntimeException("transport service should not be null") logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) @@ -126,15 +141,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val updatedLastRunContext = lastRunContext.toMutableMap() - val queryToDocIds = mutableMapOf>() - val inputRunResults = mutableMapOf>() - val docsToQueries = mutableMapOf>() - try { // Resolve all passed indices to concrete indices + val clusterService = monitorCtx.clusterService!! val allConcreteIndices = IndexUtils.resolveAllIndices( docLevelMonitorInput.indices, - monitorCtx.clusterService!!, + clusterService, monitorCtx.indexNameExpressionResolver!! ) if (allConcreteIndices.isEmpty()) { @@ -142,8 +154,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(",")) } - monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) - monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( + val docLevelMonitorQueries = monitorCtx.docLevelMonitorQueries!! + docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources) + docLevelMonitorQueries.indexDocLevelQueries( monitor = monitor, monitorId = monitor.id, monitorMetadata, @@ -168,24 +181,25 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() val queryingStartTimeMillis = System.currentTimeMillis() + val docLevelMonitorFanOutResponses: MutableList = mutableListOf() docLevelMonitorInput.indices.forEach { indexName -> var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), - monitorCtx.clusterService!!, + clusterService, monitorCtx.indexNameExpressionResolver!! ) var lastWriteIndex: String? = null - if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || - IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + if (IndexUtils.isAlias(indexName, clusterService.state()) || + IndexUtils.isDataStream(indexName, clusterService.state()) ) { lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) } if (lastWriteIndex != null) { val lastWriteIndexCreationDate = - IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + IndexUtils.getCreationDateForIndex(lastWriteIndex, clusterService.state()) concreteIndices = IndexUtils.getNewestIndicesByCreationDate( concreteIndices, - monitorCtx.clusterService!!.state(), + clusterService.state(), lastWriteIndexCreationDate ) } @@ -193,8 +207,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndicesSeenSoFar.addAll(concreteIndices) val updatedIndexName = indexName.replace("*", "_") updatedIndexNames.add(updatedIndexName) - val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( - monitorCtx.clusterService!!.state(), + val conflictingFields = docLevelMonitorQueries.getAllConflictingFields( + clusterService.state(), concreteIndices ) @@ -205,7 +219,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor, periodStart, periodEnd, - monitorCtx.clusterService!!.state().metadata.index(concreteIndexName) + clusterService.state().metadata.index(concreteIndexName) ) MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently) } @@ -216,10 +230,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx, concreteIndexName ) as MutableMap - if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || - IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + if (IndexUtils.isAlias(indexName, clusterService.state()) || + IndexUtils.isDataStream(indexName, clusterService.state()) ) { - if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) { + if (concreteIndexName == IndexUtils.getWriteIndex(indexName, clusterService.state())) { updatedLastRunContext.remove(lastWriteIndex) updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext } @@ -259,92 +273,99 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexUpdatedRunContext, updatedIndexName, concreteIndexName, + updatedIndexNames, + concreteIndices, conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), ) + val shards = mutableSetOf() + shards.addAll(indexUpdatedRunContext.keys) + shards.remove("index") + shards.remove("shards_count") - fetchShardDataAndMaybeExecutePercolateQueries( - monitor, + val nodeMap = getNodes(monitorCtx) + val nodeShardAssignments = distributeShards( monitorCtx, - indexExecutionContext, - monitorMetadata, - inputRunResults, - docsToQueries, - transformedDocs, - docsSizeInBytes, - updatedIndexNames, - concreteIndicesSeenSoFar, - ArrayList(fieldsToBeQueried), - nonPercolateSearchesTimeTaken, - percolateQueriesTimeTaken, - totalDocsQueried, - docTransformTimeTaken - ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number - indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo + nodeMap.keys.toList(), + shards.toList(), + concreteIndexName + ) + /* val dlmfor: DocLevelMonitorFanOutResponse = monitorCtx.client!!.suspendUntil { + execute(DocLevelMonitorFanOutAction.INSTANCE, docLevelMonitorFanOutRequest1, it) + } + val lastRunContextFromResponse = dlmfor.lastRunContexts as MutableMap> + <<<<<<< HEAD + updatedLastRunContext[concreteIndexName] = lastRunContextFromResponse[concreteIndexName] as MutableMap + logger.error(dlmfor) + ======= + lastRunContext[concreteIndexName] = lastRunContextFromResponse[concreteIndexName] as MutableMap + logger.error(dlmfor)*/ + + nodeShardAssignments.forEach { + logger.info(it.key) + it.value.forEach { it1 -> + logger.info(it1.id.toString()) + } } - } - } - /* if all indices are covered still in-memory docs size limit is not breached we would need to submit - the percolate query at the end */ - if (transformedDocs.isNotEmpty()) { - performPercolateQueryAndResetCounters( - monitorCtx, - transformedDocs, - docsSizeInBytes, - monitor, - monitorMetadata, - updatedIndexNames, - concreteIndicesSeenSoFar, - inputRunResults, - docsToQueries, - percolateQueriesTimeTaken, - totalDocsQueried - ) - } - val took = System.currentTimeMillis() - queryingStartTimeMillis - logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in $executionId") - monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) - - /* - populate the map queryToDocIds with pairs of - this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser - */ - queries.forEach { - if (inputRunResults.containsKey(it.id)) { - queryToDocIds[it] = inputRunResults[it.id]!! - } - } - - val idQueryMap: Map = queries.associateBy { it.id } + val responses: Collection = suspendCoroutine { cont -> + val listener = GroupedActionListener( + object : ActionListener> { + override fun onResponse(response: Collection) { + logger.info("hit here1") + cont.resume(response) + } + + override fun onFailure(e: Exception) { + logger.info("Fan out failed", e) + if (e.cause is Exception) // unwrap remote transport exception + cont.resumeWithException(e.cause as Exception) + else + cont.resumeWithException(e) + } + }, + nodeMap.size + ) + val responseReader = Writeable.Reader { + DocLevelMonitorFanOutResponse(it) + } + for (node in nodeMap) { + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + node.key, + monitor, + dryrun, + monitorMetadata, + executionId, + listOf(indexExecutionContext), + nodeShardAssignments[node.key]!!.toList(), + workflowRunContext + ) - val triggerResults = mutableMapOf() - // If there are no triggers defined, we still want to generate findings - if (monitor.triggers.isEmpty()) { - if (dryrun == false && monitor.id != Monitor.NO_ID) { - logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") - createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) - } - } else { - monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTrigger( - monitorCtx, - monitorResult, - it as DocumentLevelTrigger, - monitor, - idQueryMap, - docsToQueries, - queryToDocIds, - dryrun, - executionId = executionId, - workflowRunContext = workflowRunContext - ) + transportService.sendRequest( + node.value, + DocLevelMonitorFanOutAction.NAME, + docLevelMonitorFanOutRequest, + TransportRequestOptions.EMPTY, + object : ActionListenerResponseHandler(listener, responseReader) { + override fun handleException(e: TransportException) { + listener.onFailure(e) + } + + override fun handleResponse(response: DocLevelMonitorFanOutResponse) { + listener.onResponse(response) + } + } + ) + } + } + docLevelMonitorFanOutResponses.addAll(responses) } } - // Don't update monitor if this is a test monitor + updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, updatedLastRunContext) + val triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses) + val inputRunResults = buildInputRunResults(docLevelMonitorFanOutResponses) if (!isTempMonitor) { // If any error happened during trigger execution, upsert monitor error alert - val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) + val errorMessage = constructErrorMessageFromTriggerResults(triggerResults) if (errorMessage.isNotEmpty()) { monitorCtx.alertService!!.upsertMonitorErrorAlert( monitor = monitor, @@ -355,28 +376,28 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } else { onSuccessfulMonitorRun(monitorCtx, monitor) } - logger.error( - "Calling upsertMetadata function from ${monitorCtx.clusterService!!.localNode().id} in " + - "execution $executionId" - ) MonitorMetadataService.upsertMetadata( monitorMetadata.copy(lastRunContext = updatedLastRunContext), true ) } - // TODO: Update the Document as part of the Trigger and return back the trigger action result - return monitorResult.copy(triggerResults = triggerResults) + return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults) } catch (e: Exception) { val errorMessage = ExceptionsHelper.detailedMessage(e) - monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext) + if (!isTempMonitor) { + monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext) + } logger.error("Failed running Document-level-monitor ${monitor.name}", e) val alertingException = AlertingException( errorMessage, RestStatus.INTERNAL_SERVER_ERROR, e ) - return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + return monitorResult.copy( + error = alertingException, + inputResults = InputRunResults(emptyList(), alertingException) + ) } finally { logger.error( "PERF_DEBUG_STATS: Monitor ${monitor.id} " + @@ -388,6 +409,52 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } + private fun buildInputRunResults(docLevelMonitorFanOutResponses: MutableList): InputRunResults { + val inputRunResults = mutableMapOf>() + var error: Exception? = null + for (response in docLevelMonitorFanOutResponses) { + if (error == null && response.inputResults.error != null) + error = response.inputResults.error + + val partialResult = response.inputResults.results + for (result in partialResult) { + for (id in result.keys) { + inputRunResults.getOrPut(id) { mutableSetOf() }.addAll(result[id] as Collection) + } + } + } + return InputRunResults(listOf(inputRunResults), error) + } + + private fun buildTriggerResults( + docLevelMonitorFanOutResponses: MutableList, + ): MutableMap { + val triggerResults = mutableMapOf() + for (res in docLevelMonitorFanOutResponses) { + for (triggerId in res.triggerResults.keys) { + val documentLevelTriggerRunResult = res.triggerResults[triggerId] + if (documentLevelTriggerRunResult != null) { + if (false == triggerResults.contains(triggerId)) { + triggerResults[triggerId] = documentLevelTriggerRunResult + } else { + val currVal = triggerResults[triggerId] + val newTrigggeredDocs = mutableListOf() + newTrigggeredDocs.addAll(currVal!!.triggeredDocs) + newTrigggeredDocs.addAll(documentLevelTriggerRunResult.triggeredDocs) + triggerResults.put( + triggerId, + currVal.copy( + triggeredDocs = newTrigggeredDocs, + error = if (currVal.error != null) currVal.error else documentLevelTriggerRunResult.error + ) + ) + } + } + } + } + return triggerResults + } + private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) { monitorCtx.alertService!!.clearMonitorErrorAlert(monitor) if (monitor.dataSources.alertsHistoryIndex != null) { @@ -400,7 +467,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } private fun constructErrorMessageFromTriggerResults( - triggerResults: MutableMap? = null + triggerResults: MutableMap? = null, ): String { var errorMessage = "" if (triggerResults != null) { @@ -427,7 +494,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { queryToDocIds: Map>, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, - executionId: String + executionId: String, ): DocumentLevelTriggerRunResult { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) @@ -482,7 +549,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { for (alert in alerts) { - val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) + val actionResults = + this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) } @@ -500,7 +568,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val updatedAlerts = alerts.map { alert -> val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap()) val actionExecutionResults = actionResults.values.map { actionRunResult -> - ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0) + ActionExecutionResult( + actionRunResult.actionId, + actionRunResult.executionTime, + if (actionRunResult.throttled) 1 else 0 + ) } alert.copy(actionExecutionResults = actionExecutionResults) } @@ -584,7 +656,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private fun publishFinding( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - finding: Finding + finding: Finding, ) { val publishFindingsRequest = PublishFindingsRequest(monitor.id, finding) AlertingPluginInterface.publishFinding( @@ -598,6 +670,35 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } + private fun updateLastRunContextFromFanOutResponses( + docLevelMonitorFanOutResponses: MutableList, + updatedLastRunContext: MutableMap>, + ) { + // Prepare updatedLastRunContext for each index + for (indexName in updatedLastRunContext.keys) { + for (fanOutResponse in docLevelMonitorFanOutResponses) { + // fanOutResponse.lastRunContexts //updatedContexts for relevant shards + val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap + + if (fanOutResponse.lastRunContexts.contains(indexName)) { + val partialUpdatedIndexLastRunContext = fanOutResponse.lastRunContexts[indexName] as MutableMap + partialUpdatedIndexLastRunContext.keys.forEach { + + val seq_no = partialUpdatedIndexLastRunContext[it].toString().toIntOrNull() + if ( + it != "shards_count" && + it != "index" && + seq_no != null && + seq_no != SequenceNumbers.UNASSIGNED_SEQ_NO.toInt() + ) { + indexLastRunContext[it] = seq_no + } + } + } + } + } + } + private fun initializeNewLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, @@ -632,7 +733,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor: Monitor, periodStart: Instant, periodEnd: Instant, - indexMetadata: IndexMetadata + indexMetadata: IndexMetadata, ): Boolean { val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L @@ -643,7 +744,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * Get the current max seq number of the shard. We find it by searching the last document * in the primary shard. */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long { + private suspend fun getMaxSeqNo( + client: Client, + index: String, + shard: String, + nonPercolateSearchesTimeTaken: AtomicLong, + ): Long { val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -666,7 +772,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } private fun getShardsCount(clusterService: ClusterService, index: String): Int { - val allShards: List = clusterService!!.state().routingTable().allShards(index) + val allShards: List = clusterService.state().routingTable().allShards(index) return allShards.filter { it.primary() }.size } @@ -691,7 +797,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { percolateQueriesTimeTaken: AtomicLong, totalDocsQueried: AtomicLong, docTransformTimeTake: AtomicLong, - updateLastRunContext: (String, String) -> Unit + updateLastRunContext: (String, String) -> Unit, ) { val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -735,7 +841,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) if ( transformedDocs.isNotEmpty() && - shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + shouldPerformPercolateQueryAndFlushInMemoryDocs( + docsSizeInBytes, + transformedDocs.size, + monitorCtx + ) ) { performPercolateQueryAndResetCounters( monitorCtx, @@ -787,7 +897,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { inputRunResults: MutableMap>, docsToQueries: MutableMap>, percolateQueriesTimeTaken: AtomicLong, - totalDocsQueried: AtomicLong + totalDocsQueried: AtomicLong, ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( @@ -875,7 +985,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, concreteIndices: List, monitorInputIndices: List, - percolateQueriesTimeTaken: AtomicLong + percolateQueriesTimeTaken: AtomicLong, ): SearchHits { val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) @@ -932,6 +1042,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { percolateQueriesTimeTaken.getAndAdd(response.took.millis) return response.hits } + /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ private fun buildShouldClausesOverPerIndexMatchQueries(indices: List): BoolQueryBuilder { val boolQueryBuilder = QueryBuilders.boolQuery() @@ -1007,7 +1118,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { conflictingFields: List, fieldNameSuffixPattern: String, fieldNameSuffixIndex: String, - fieldNamePrefix: String + fieldNamePrefix: String, ) { val tempMap = mutableMapOf() val it: MutableIterator> = jsonAsMap.entries.iterator() @@ -1044,19 +1155,67 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * amount of percentage (default:10. setting is dynamic and configurable) of the total heap size or not. * */ - private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { - var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + private fun isInMemoryDocsSizeExceedingMemoryLimit( + docsBytesSize: Long, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes return docsBytesSize > thresholdBytes } - private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { - var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit( + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory return numDocs >= maxNumDocsThreshold } + private suspend fun getNodes(monitorCtx: MonitorRunnerExecutionContext): MutableMap { + return monitorCtx.clusterService!!.state().nodes.dataNodes + } + + private fun distributeShards( + monitorCtx: MonitorRunnerExecutionContext, + allNodes: List, + shards: List, + index: String, + ): Map> { + + val totalShards = shards.size + val numFanOutNodes = allNodes.size.coerceAtMost((totalShards + 1) / 2) + val totalNodes = monitorCtx.totalNodesFanOut.coerceAtMost(numFanOutNodes) + val shardsPerNode = totalShards / totalNodes + var shardsRemaining = totalShards % totalNodes + + val shardIdList = shards.map { + ShardId(monitorCtx.clusterService!!.state().metadata.index(index).index, it.toInt()) + } + val nodes = allNodes.subList(0, totalNodes) + + val nodeShardAssignments = mutableMapOf>() + var idx = 0 + for (node in nodes) { + val nodeShardAssignment = mutableSetOf() + for (i in 1..shardsPerNode) { + nodeShardAssignment.add(shardIdList[idx++]) + } + nodeShardAssignments[node] = nodeShardAssignment + } + + for (node in nodes) { + if (shardsRemaining == 0) { + break + } + nodeShardAssignments[node]!!.add(shardIdList[idx++]) + --shardsRemaining + } + return nodeShardAssignments + } + /** * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name * and doc source. A list of these POJOs would be passed to percolate query execution logic. @@ -1065,6 +1224,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var indexName: String, var concreteIndexName: String, var docId: String, - var docSource: BytesReference + var docSource: BytesReference, ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index f8d5fe686..73d41f9e8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -32,6 +32,7 @@ import org.opensearch.commons.alerting.model.Table import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.notifications.model.NotificationConfigInfo import org.opensearch.core.common.Strings +import org.opensearch.transport.TransportService import java.time.Instant abstract class MonitorRunner { @@ -43,7 +44,8 @@ abstract class MonitorRunner { periodEnd: Instant, dryRun: Boolean, workflowRunContext: WorkflowRunContext? = null, - executionId: String + executionId: String, + transportService: TransportService? ): MonitorRunResult<*> suspend fun runAction( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 3b17ceebe..a9bf13273 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -49,5 +49,9 @@ data class MonitorRunnerExecutionContext( @Volatile var destinationContextFactory: DestinationContextFactory? = null, @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, - @Volatile var indexTimeout: TimeValue? = null + @Volatile var indexTimeout: TimeValue? = null, + @Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES, + @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, + @Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = + AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index b8719b4b0..ac911c486 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -13,6 +13,12 @@ import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.action.ExecuteMonitorAction +import org.opensearch.alerting.action.ExecuteMonitorRequest +import org.opensearch.alerting.action.ExecuteMonitorResponse +import org.opensearch.alerting.action.ExecuteWorkflowAction +import org.opensearch.alerting.action.ExecuteWorkflowRequest +import org.opensearch.alerting.action.ExecuteWorkflowResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts import org.opensearch.alerting.core.JobRunner @@ -21,9 +27,12 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.WorkflowRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.opensearchapi.retry +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FAN_OUT_NODES import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT @@ -40,6 +49,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.lifecycle.AbstractLifecycleComponent import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob @@ -53,10 +63,12 @@ import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript import org.opensearch.threadpool.ThreadPool +import org.opensearch.transport.TransportService import java.time.Instant import java.time.LocalDateTime import java.time.ZoneOffset import java.util.UUID +import java.util.concurrent.TimeUnit import kotlin.coroutines.CoroutineContext object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleComponent() { @@ -173,6 +185,24 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.maxActionableAlertCount = it } + monitorCtx.totalNodesFanOut = DOC_LEVEL_MONITOR_FAN_OUT_NODES.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FAN_OUT_NODES) { + monitorCtx.totalNodesFanOut = it + } + + monitorCtx.percQueryMaxNumDocsInMemory = + AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) { + monitorCtx.percQueryMaxNumDocsInMemory = it + } + + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = + AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings + .addSettingsUpdateConsumer(AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) { + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it + } + monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings) return this @@ -261,29 +291,70 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon "PERF_DEBUG: executing workflow ${job.id} on node " + monitorCtx.clusterService!!.state().nodes().localNode.id ) - runJob(job, periodStart, periodEnd, false) + + monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute( + ExecuteWorkflowAction.INSTANCE, + ExecuteWorkflowRequest(false, TimeValue(1, TimeUnit.DAYS), job.id, job), + it + ) + } } } + is Monitor -> { launch { logger.debug( "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + monitorCtx.clusterService!!.state().nodes().localNode.id ) - runJob(job, periodStart, periodEnd, false) +// runJob(job, periodStart, periodEnd, false) + val executeMonitorRequest = ExecuteMonitorRequest( + false, + TimeValue(periodEnd.toEpochMilli()), + job.id, + job + ) + monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute( + ExecuteMonitorAction.INSTANCE, + executeMonitorRequest, + it + ) + } } } + else -> { throw IllegalArgumentException("Invalid job type") } } } - suspend fun runJob(workflow: Workflow, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): WorkflowRunResult { - return CompositeWorkflowRunner.runWorkflow(workflow, monitorCtx, periodStart, periodEnd, dryrun) + suspend fun runJob( + workflow: Workflow, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean, + transportService: TransportService, + ): WorkflowRunResult { + return CompositeWorkflowRunner.runWorkflow( + workflow, + monitorCtx, + periodStart, + periodEnd, + dryrun, + transportService + ) } - suspend fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): MonitorRunResult<*> { + suspend fun runJob( + job: ScheduledJob, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean, + transportService: TransportService, + ): MonitorRunResult<*> { // Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping // has not been updated. if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) { @@ -303,7 +374,13 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon if (job is Workflow) { logger.info("Executing scheduled workflow - id: ${job.id}, periodStart: $periodStart, periodEnd: $periodEnd, dryrun: $dryrun") - CompositeWorkflowRunner.runWorkflow(workflow = job, monitorCtx, periodStart, periodEnd, dryrun) + monitorCtx.client!!.suspendUntil { + monitorCtx.client!! + .execute( + ExecuteWorkflowAction.INSTANCE, + ExecuteWorkflowRequest(false, TimeValue(1, TimeUnit.DAYS), job.id, job) + ) + } } val monitor = job as Monitor val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}" @@ -312,11 +389,35 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon "periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId" ) val runResult = if (monitor.isBucketLevelMonitor()) { - BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) + BucketLevelMonitorRunner.runMonitor( + monitor, + monitorCtx, + periodStart, + periodEnd, + dryrun, + executionId = executionId, + transportService = transportService + ) } else if (monitor.isDocLevelMonitor()) { - DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) + DocumentLevelMonitorRunner.runMonitor( + monitor, + monitorCtx, + periodStart, + periodEnd, + dryrun, + executionId = executionId, + transportService = transportService + ) } else { - QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) + QueryLevelMonitorRunner.runMonitor( + monitor, + monitorCtx, + periodStart, + periodEnd, + dryrun, + executionId = executionId, + transportService = transportService + ) } return runResult } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index 691071517..b9f3b94c7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -16,6 +16,7 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger +import org.opensearch.transport.TransportService import java.time.Instant object QueryLevelMonitorRunner : MonitorRunner() { @@ -28,7 +29,8 @@ object QueryLevelMonitorRunner : MonitorRunner() { periodEnd: Instant, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, - executionId: String + executionId: String, + transportService: TransportService? ): MonitorRunResult { val roles = MonitorRunnerService.getRolesForMonitor(monitor) logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutAction.kt new file mode 100644 index 000000000..c03d95942 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutAction.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionType + +class DocLevelMonitorFanOutAction private constructor() : ActionType(NAME, ::DocLevelMonitorFanOutResponse) { + companion object { + val INSTANCE = DocLevelMonitorFanOutAction() + const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt new file mode 100644 index 000000000..f60fdd0ea --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.alerting.model.IndexExecutionContext +import org.opensearch.alerting.model.MonitorMetadata +import org.opensearch.alerting.workflow.WorkflowRunContext +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.index.shard.ShardId +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { + + val nodeId: String + val monitor: Monitor + val dryRun: Boolean + val monitorMetadata: MonitorMetadata + val executionId: String + val indexExecutionContexts: List + val shardIds: List + val workflowRunContext: WorkflowRunContext? + + constructor( + nodeId: String, + monitor: Monitor, + dryRun: Boolean, + monitorMetadata: MonitorMetadata, + executionId: String, + indexExecutionContexts: List, + shardIds: List, + workflowRunContext: WorkflowRunContext?, + ) : super() { + this.nodeId = nodeId + this.monitor = monitor + this.dryRun = dryRun + this.monitorMetadata = monitorMetadata + this.executionId = executionId + this.indexExecutionContexts = indexExecutionContexts + this.shardIds = shardIds + this.workflowRunContext = workflowRunContext + require(false == shardIds.isEmpty()) { } + require(false == indexExecutionContexts.isEmpty()) { } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + nodeId = sin.readString(), + monitor = Monitor.readFrom(sin)!!, + dryRun = sin.readBoolean(), + monitorMetadata = MonitorMetadata.readFrom(sin), + executionId = sin.readString(), + indexExecutionContexts = sin.readList { IndexExecutionContext(sin) }, + shardIds = sin.readList(::ShardId), + workflowRunContext = if (sin.readBoolean()) { + WorkflowRunContext(sin) + } else null, + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(nodeId) + monitor.writeTo(out) + out.writeBoolean(dryRun) + monitorMetadata.writeTo(out) + out.writeString(executionId) + out.writeCollection(indexExecutionContexts) + out.writeCollection(shardIds) + out.writeBoolean(workflowRunContext != null) + workflowRunContext?.writeTo(out) + } + + override fun validate(): ActionRequestValidationException? { + var actionValidationException: ActionRequestValidationException? = null + if (shardIds.isEmpty()) { + actionValidationException = ActionRequestValidationException() + actionValidationException.addValidationError("shard_ids is null or empty") + } + if (indexExecutionContexts.isEmpty()) { + if (actionValidationException == null) + actionValidationException = ActionRequestValidationException() + actionValidationException.addValidationError("index_execution_contexts is null or empty") + } + return actionValidationException + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("node_id", nodeId) + .field("monitor", monitor) + .field("dry_run", dryRun) + .field("execution_id", executionId) + .field("index_execution_contexts", indexExecutionContexts) + .field("shard_ids", shardIds) + .field("workflow_run_context", workflowRunContext) + return builder.endObject() + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt new file mode 100644 index 000000000..1a2bc2203 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.InputRunResults +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject { + val nodeId: String + val executionId: String + val monitorId: String + val shardIdFailureMap: Map + val findingIds: List + + // for shards not delegated to nodes sequence number would be -3 (new number shard was not queried), + val lastRunContexts: MutableMap // partial + val inputResults: InputRunResults // partial + val triggerResults: Map // partial + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + nodeId = sin.readString(), + executionId = sin.readString(), + monitorId = sin.readString(), + shardIdFailureMap = sin.readMap() as Map, + findingIds = sin.readStringList(), + lastRunContexts = sin.readMap()!! as MutableMap, + inputResults = InputRunResults.readFrom(sin), + triggerResults = suppressWarning(sin.readMap(StreamInput::readString, DocumentLevelTriggerRunResult::readFrom)) + ) + + constructor( + nodeId: String, + executionId: String, + monitorId: String, + shardIdFailureMap: Map, + findingIds: List, + lastRunContexts: MutableMap, + inputResults: InputRunResults = InputRunResults(), // partial, + triggerResults: Map = mapOf(), + ) : super() { + this.nodeId = nodeId + this.executionId = executionId + this.monitorId = monitorId + this.shardIdFailureMap = shardIdFailureMap + this.findingIds = findingIds + this.lastRunContexts = lastRunContexts + this.inputResults = inputResults + this.triggerResults = triggerResults + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(nodeId) + out.writeString(executionId) + out.writeString(monitorId) + out.writeMap(shardIdFailureMap) + out.writeStringCollection(findingIds) + out.writeMap(lastRunContexts) + inputResults.writeTo(out) + out.writeMap( + triggerResults, + StreamOutput::writeString, + { stream, stats -> stats.writeTo(stream) } + ) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("last_run_contexts", lastRunContexts) + .endObject() + return builder + } + + companion object { + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt index 9d98aab42..d3d0b83b3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt @@ -12,11 +12,12 @@ import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.script.ScriptException import java.io.IOException +@Suppress("IMPLICIT_NOTHING_TYPE_ARGUMENT_AGAINST_NOT_NOTHING_EXPECTED_TYPE") data class DocumentLevelTriggerRunResult( override var triggerName: String, var triggeredDocs: List, override var error: Exception?, - var actionResultsMap: MutableMap> = mutableMapOf() + var actionResultsMap: MutableMap> = mutableMapOf(), ) : TriggerRunResult(triggerName, error) { @Throws(IOException::class) @@ -31,7 +32,7 @@ data class DocumentLevelTriggerRunResult( override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) return builder - .field("triggeredDocs", triggeredDocs as List) + .field("triggeredDocs", triggeredDocs) .field("action_results", actionResultsMap as Map) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt index 97156eb96..888ac9f12 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt @@ -6,13 +6,62 @@ package org.opensearch.alerting.model import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException data class IndexExecutionContext( val queries: List, - val lastRunContext: MutableMap, - val updatedLastRunContext: MutableMap, + val lastRunContext: MutableMap, // previous execution + val updatedLastRunContext: MutableMap, // without sequence numbers val indexName: String, val concreteIndexName: String, + val updatedIndexNames: List, + val concreteIndexNames: List, val conflictingFields: List, - val docIds: List? = null -) + val docIds: List? = emptyList(), +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + queries = sin.readList { DocLevelQuery(sin) }, + lastRunContext = sin.readMap(), + updatedLastRunContext = sin.readMap(), + indexName = sin.readString(), + concreteIndexName = sin.readString(), + updatedIndexNames = sin.readStringList(), + concreteIndexNames = sin.readStringList(), + conflictingFields = sin.readStringList(), + docIds = sin.readStringList() + ) + + override fun writeTo(out: StreamOutput?) { + out!!.writeCollection(queries) + out.writeMap(lastRunContext) + out.writeMap(updatedLastRunContext) + out.writeString(indexName) + out.writeString(concreteIndexName) + out.writeStringCollection(updatedIndexNames) + out.writeStringCollection(concreteIndexNames) + out.writeStringCollection(conflictingFields) + out.writeOptionalStringCollection(docIds) + } + + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + builder!!.startObject() + .field("queries", queries) + .field("last_run_context", lastRunContext) + .field("updated_last_run_context", updatedLastRunContext) + .field("index_name", indexName) + .field("concrete_index_name", concreteIndexName) + .field("udpated_index_names", updatedIndexNames) + .field("concrete_index_names", concreteIndexNames) + .field("conflicting_fields", conflictingFields) + .field("doc_ids", docIds) + .endObject() + return builder + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/TriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/TriggerRunResult.kt index c3aec89f2..9a7e20985 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/TriggerRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/TriggerRunResult.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting.model import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable import org.opensearch.core.xcontent.ToXContent @@ -15,9 +16,15 @@ import java.time.Instant abstract class TriggerRunResult( open var triggerName: String, - open var error: Exception? = null + open var error: Exception? = null, ) : Writeable, ToXContent { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), + if (sin.readBoolean()) sin.readException() else null + ) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() .field("name", triggerName) @@ -43,7 +50,10 @@ abstract class TriggerRunResult( @Throws(IOException::class) override fun writeTo(out: StreamOutput) { out.writeString(triggerName) - out.writeException(error) + if (error == null) out.writeBoolean(false) else { + out.writeBoolean(true) + out.writeException(error) + } } companion object { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 1bf2dc663..acd2cd0b7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -17,6 +17,9 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L + const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000 + const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10 + const val DEFAULT_FAN_OUT_NODES = 1000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -36,6 +39,18 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** Defines the threshold of the docs accumulated in memory to query against percolate query index in document + * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the + * monitor input indices field. + */ + val DOC_LEVEL_MONITOR_FAN_OUT_NODES = Setting.intSetting( + "plugins.alerting.monitor.doc_level_monitor_fan_out_nodes", + DEFAULT_FAN_OUT_NODES, + 1, + Int.MAX_VALUE, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + /** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the * monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate @@ -44,7 +59,7 @@ class AlertingSettings { */ val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting( "plugins.alerting.monitor.percolate_query_max_num_docs_in_memory", - 10000, 1000, + DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, 1000, Setting.Property.NodeScope, Setting.Property.Dynamic ) @@ -53,7 +68,7 @@ class AlertingSettings { * Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards. */ val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting( - "plugins.alerting.monitor.doc_level_monitor_fetch_only_query_fields_enabled", + "plugins.alerting.monitor.doc_level_monitor_query_field_names_enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt new file mode 100644 index 000000000..304082365 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -0,0 +1,1075 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.search.SearchAction +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.MonitorRunnerExecutionContext +import org.opensearch.alerting.MonitorRunnerService +import org.opensearch.alerting.action.DocLevelMonitorFanOutAction +import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse +import org.opensearch.alerting.action.GetDestinationsAction +import org.opensearch.alerting.action.GetDestinationsRequest +import org.opensearch.alerting.action.GetDestinationsResponse +import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.IndexExecutionContext +import org.opensearch.alerting.model.InputRunResults +import org.opensearch.alerting.model.MonitorMetadata +import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.userErrorMessage +import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.opensearchapi.withClosableContext +import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext +import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.defaultToPerExecutionAction +import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs +import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils +import org.opensearch.alerting.util.destinationmigration.getTitle +import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification +import org.opensearch.alerting.util.destinationmigration.sendNotification +import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.util.isAllowed +import org.opensearch.alerting.util.isTestAction +import org.opensearch.alerting.util.use +import org.opensearch.alerting.workflow.WorkflowRunContext +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.ActionExecutionResult +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.DocumentLevelTrigger +import org.opensearch.commons.alerting.model.Finding +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Table +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.commons.alerting.model.action.PerAlertActionScope +import org.opensearch.commons.alerting.util.string +import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.core.action.ActionListener +import org.opensearch.core.common.Strings +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.index.shard.ShardId +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.query.BoolQueryBuilder +import org.opensearch.index.query.Operator +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indices.IndexClosedException +import org.opensearch.percolator.PercolateQueryBuilderExt +import org.opensearch.search.SearchHit +import org.opensearch.search.SearchHits +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.sort.SortOrder +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.io.IOException +import java.time.Instant +import java.util.UUID +import java.util.concurrent.atomic.AtomicLong +import java.util.stream.Collectors + +private val log = LogManager.getLogger(TransportDocLevelMonitorFanOutAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportDocLevelMonitorFanOutAction +@Inject constructor( + transportService: TransportService, + val client: Client, + val actionFilters: ActionFilters, + val runner: MonitorRunnerService, + val settings: Settings, +) : HandledTransportAction( + DocLevelMonitorFanOutAction.NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest +), + SecureTransportAction { + + @Volatile + override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + override fun doExecute( + task: Task, + request: DocLevelMonitorFanOutRequest, + listener: ActionListener, + ) { + scope.launch { + executeMonitor( + request, + monitorCtx = runner.monitorCtx, + listener = listener + ) + } + } + + private suspend fun executeMonitor( + request: DocLevelMonitorFanOutRequest, + monitorCtx: MonitorRunnerExecutionContext, + listener: ActionListener, + ) { + try { + val monitor = request.monitor + var monitorResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) + // todo periodStart periodEnd + var nonPercolateSearchesTimeTaken = AtomicLong(0) + var percolateQueriesTimeTaken = AtomicLong(0) + var totalDocsQueried = AtomicLong(0) + var docTransformTimeTaken = AtomicLong(0) + val shardIdFailureMap = mutableMapOf() + val updatedIndexNames = request.indexExecutionContexts[0].updatedIndexNames + val concreteIndexNames = request.indexExecutionContexts[0].concreteIndexNames + val monitorMetadata = request.monitorMetadata + val queryToDocIds = mutableMapOf>() + val inputRunResults = mutableMapOf>() + val docsToQueries = mutableMapOf>() + val transformedDocs = mutableListOf>() + val docsSizeInBytes = AtomicLong(0) + val shardIds = request.shardIds + val indexShardsMap: MutableMap> = mutableMapOf() + val queryingStartTimeMillis = System.currentTimeMillis() + for (shardId in shardIds) { + if (indexShardsMap.containsKey(shardId.indexName)) { + indexShardsMap[shardId.indexName]!!.add(shardId.id) + } else { + indexShardsMap[shardId.indexName] = mutableListOf(shardId.id) + } + } + val lastRunContext = mutableMapOf>() + InputRunResults + val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput + val queries: List = docLevelMonitorInput.queries + val fieldsToBeQueried = mutableSetOf() + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + log.debug( + "Monitor ${request.monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) + } + for (entry in indexShardsMap) { + val indexExecutionContext = + request.indexExecutionContexts.stream() + .filter { it.concreteIndexName.equals(entry.key) }.findAny() + .get() + fetchShardDataAndMaybeExecutePercolateQueries( + request.monitor, + entry.value, + monitorCtx, + indexExecutionContext, + request.monitorMetadata, + inputRunResults, + docsToQueries, + transformedDocs, + docsSizeInBytes, + indexExecutionContext.updatedIndexNames, + indexExecutionContext.concreteIndexNames, + ArrayList(fieldsToBeQueried), + nonPercolateSearchesTimeTaken, + percolateQueriesTimeTaken, + totalDocsQueried, + docTransformTimeTaken, + shardIdFailureMap, + ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number + indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo + } + lastRunContext[indexExecutionContext.concreteIndexName] = indexExecutionContext.updatedLastRunContext + } + + /* if all indices are covered still in-memory docs size limit is not breached we would need to submit + the percolate query at the end */ + if (transformedDocs.isNotEmpty()) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + updatedIndexNames, + concreteIndexNames, + inputRunResults, + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried + ) + } + val took = System.currentTimeMillis() - queryingStartTimeMillis + log.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in ${request.executionId}") + monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) + + /* + populate the map queryToDocIds with pairs of + this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser + */ + queries.forEach { + if (inputRunResults.containsKey(it.id)) { + queryToDocIds[it] = inputRunResults[it.id]!! + } + } + + val idQueryMap: Map = queries.associateBy { it.id } + + val triggerResults = mutableMapOf() + // If there are no triggers defined, we still want to generate findings + if (monitor.triggers.isEmpty()) { + if (request.dryRun == false && monitor.id != Monitor.NO_ID) { + log.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) + } + } else { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTrigger( + monitorCtx, + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryToDocIds, + request.dryRun, + executionId = request.executionId, + workflowRunContext = request.workflowRunContext + ) + } + } + + // If any error happened during trigger execution, upsert monitor error alert + val errorMessage = + constructErrorMessageFromTriggerResults(triggerResults = triggerResults) + if (errorMessage.isNotEmpty()) { + monitorCtx.alertService!!.upsertMonitorErrorAlert( + monitor = monitor, + errorMessage = errorMessage, + executionId = request.executionId, + request.workflowRunContext + ) + } else { + onSuccessfulMonitorRun(monitorCtx, monitor) + } + listener.onResponse( + DocLevelMonitorFanOutResponse( + nodeId = monitorCtx.clusterService!!.localNode().id, + executionId = request.executionId, + monitorId = monitor.id, + shardIdFailureMap = shardIdFailureMap, + findingIds = emptyList(), + lastRunContext as MutableMap, + InputRunResults(listOf(inputRunResults)), + triggerResults + ) + ) + } catch (e: Exception) { + log.error("${request.monitor.id} Failed to run fan_out on node ${monitorCtx.clusterService!!.localNode().id} due to error", e) + listener.onFailure(e) + } + } + + private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) { + monitorCtx.alertService!!.clearMonitorErrorAlert(monitor) + if (monitor.dataSources.alertsHistoryIndex != null) { + monitorCtx.alertService!!.moveClearedErrorAlertsToHistory( + monitor.id, + monitor.dataSources.alertsIndex, + monitor.dataSources.alertsHistoryIndex!! + ) + } + } + + private fun constructErrorMessageFromTriggerResults( + triggerResults: MutableMap? = null, + ): String { + var errorMessage = "" + if (triggerResults != null) { + val triggersErrorBuilder = StringBuilder() + triggerResults.forEach { + if (it.value.error != null) { + triggersErrorBuilder.append("[${it.key}]: [${it.value.error!!.userErrorMessage()}]").append(" | ") + } + } + if (triggersErrorBuilder.isNotEmpty()) { + errorMessage = "Trigger errors: $triggersErrorBuilder" + } + } + return errorMessage + } + + private suspend fun createFindings( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + docsToQueries: MutableMap>, + idQueryMap: Map, + shouldCreateFinding: Boolean, + workflowExecutionId: String? = null, + ): List> { + + val findingDocPairs = mutableListOf>() + val findings = mutableListOf() + val indexRequests = mutableListOf() + + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } + + // Before the "|" is the doc id and after the "|" is the index + val docIndex = it.key.split("|") + + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = listOf(docIndex[0]), + correlatedDocIds = listOf(docIndex[0]), + monitorId = monitor.id, + monitorName = monitor.name, + index = docIndex[1], + docLevelQueries = triggeredQueries, + timestamp = Instant.now(), + executionId = workflowExecutionId + ) + findingDocPairs.add(Pair(finding.id, it.key)) + findings.add(finding) + + val findingStr = + finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) + .string() + log.debug("Findings: $findingStr") + + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) + .opType(DocWriteRequest.OpType.CREATE) + } + } + + if (indexRequests.isNotEmpty()) { + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { + bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + log.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + } + } + } else { + log.debug("[${bulkResponse.items.size}] All findings successfully indexed.") + } + } + return findingDocPairs + } + + /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. + * needs to be converted to scroll if not performant enough) + * 2. Transform documents to conform to format required for percolate query + * 3a. Check if docs in memory are crossing threshold defined by setting. + * 3b. If yes, perform percolate query and update docToQueries Map with all hits from percolate queries */ + private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( + monitor: Monitor, + shardList: MutableList, + monitorCtx: MonitorRunnerExecutionContext, + indexExecutionCtx: IndexExecutionContext, + monitorMetadata: MonitorMetadata, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + transformedDocs: MutableList>, + docsSizeInBytes: AtomicLong, + monitorInputIndices: List, + concreteIndices: List, + fieldsToBeQueried: List, + nonPercolateSearchesTimeTaken: AtomicLong, + percolateQueriesTimeTaken: AtomicLong, + totalDocsQueried: AtomicLong, + docTransformTimeTake: AtomicLong, + shardIdFailureMap: MutableMap, + updateLastRunContext: (String, String) -> Unit, + ) { + for (i in shardList) { + val shard = i.toString() + try { + val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED + var to: Long = Long.MAX_VALUE + while (to >= from) { + val hits: SearchHits = searchShard( + monitorCtx, + indexExecutionCtx.concreteIndexName, + shard, + from, + to, + indexExecutionCtx.docIds, + fieldsToBeQueried, + nonPercolateSearchesTimeTaken + ) + if (hits.hits.isEmpty()) { + updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) + break + } + if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed + + updateLastRunContext(shard, hits.hits[0].seqNo.toString()) + to = hits.hits[0].seqNo - 10000L + } else { + to -= 10000L + } + val startTime = System.currentTimeMillis() + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexExecutionCtx.indexName, + indexExecutionCtx.concreteIndexName, + monitor.id, + indexExecutionCtx.conflictingFields, + docsSizeInBytes + ) + ) + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs( + docsSizeInBytes, + transformedDocs.size, + monitorCtx + ) + ) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried + ) + } + docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime) + } + } catch (e: Exception) { + log.error( + "Monitor ${monitor.id} :" + + "Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " + + "Error: ${e.message}", + e + ) + val s = ShardId( + indexExecutionCtx.concreteIndexName, + monitorCtx.clusterService!!.state().metadata.index(indexExecutionCtx.concreteIndexName).indexUUID, + i // shard id + ) + shardIdFailureMap[s.toString()] = e + if (e is IndexClosedException) { + throw e + } + } + } + } + + /** Executes search query on given shard of given index to fetch docs with sequence number greater than prevSeqNo. + * This method hence fetches only docs from shard which haven't been queried before + */ + private suspend fun searchShard( + monitorCtx: MonitorRunnerExecutionContext, + index: String, + shard: String, + prevSeqNo: Long?, + maxSeqNo: Long, + docIds: List? = null, + fieldsToFetch: List, + nonPercolateSearchesTimeTaken: AtomicLong, + ): SearchHits { + if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { + return SearchHits.empty() + } + val boolQueryBuilder = BoolQueryBuilder() + boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) + + if (!docIds.isNullOrEmpty()) { + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + } + + val request: SearchRequest = SearchRequest() + .indices(index) + .preference("_shards:$shard") + .source( + SearchSourceBuilder() + .version(true) + .sort("_seq_no", SortOrder.DESC) + .seqNoAndPrimaryTerm(true) + .query(boolQueryBuilder) + .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k + ) + + if (AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) { + request.source().fetchSource(false) + for (field in fieldsToFetch) { + request.source().fetchField(field) + } + } + val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } + if (response.status() !== RestStatus.OK) { + throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") + } + nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) + return response.hits + } + + /** + * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name + * and doc source. A list of these POJOs would be passed to percolate query execution logic. + */ + private data class TransformedDocDto( + var indexName: String, + var concreteIndexName: String, + var docId: String, + var docSource: BytesReference, + ) + + /** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/ + private suspend fun runPercolateQueryOnTransformedDocs( + monitorCtx: MonitorRunnerExecutionContext, + docs: MutableList>, + monitor: Monitor, + monitorMetadata: MonitorMetadata, + concreteIndices: List, + monitorInputIndices: List, + percolateQueriesTimeTaken: AtomicLong, + ): SearchHits { + val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) + val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) + val percolateQueryBuilder = + PercolateQueryBuilderExt("query", docs.map { it.second.docSource }, XContentType.JSON) + if (monitor.id.isNotEmpty()) { + boolQueryBuilder.must(QueryBuilders.matchQuery("monitor_id", monitor.id).operator(Operator.AND)) + } + boolQueryBuilder.filter(percolateQueryBuilder) + val queryIndices = + docs.map { monitorMetadata.sourceToQueryIndexMapping[it.second.indexName + monitor.id] }.distinct() + if (queryIndices.isEmpty()) { + val message = + "Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" + + " sourceIndices: $monitorInputIndices" + log.error(message) + throw AlertingException.wrap( + OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) + ) + } + + val searchRequest = + SearchRequest().indices(*queryIndices.toTypedArray()) + val searchSourceBuilder = SearchSourceBuilder() + searchSourceBuilder.query(boolQueryBuilder) + searchRequest.source(searchSourceBuilder) + log.debug( + "Monitor ${monitor.id}: " + + "Executing percolate query for docs from source indices " + + "$monitorInputIndices against query index $queryIndices" + ) + var response: SearchResponse + try { + response = monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it) + } + } catch (e: Exception) { + throw IllegalStateException( + "Monitor ${monitor.id}:" + + " Failed to run percolate search for sourceIndex [${concreteIndices.joinToString()}] " + + "and queryIndex [${queryIndices.joinToString()}] for ${docs.size} document(s)", + e + ) + } + + if (response.status() !== RestStatus.OK) { + throw IOException( + "Monitor ${monitor.id}: Failed to search percolate index: ${queryIndices.joinToString()}. " + + "Response status is ${response.status()}" + ) + } + log.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + log.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query response = $response") + percolateQueriesTimeTaken.getAndAdd(response.took.millis) + return response.hits + } + + /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ + private fun buildShouldClausesOverPerIndexMatchQueries(indices: List): BoolQueryBuilder { + val boolQueryBuilder = QueryBuilders.boolQuery() + indices.forEach { boolQueryBuilder.should(QueryBuilders.matchQuery("index", it)) } + return boolQueryBuilder + } + + /** Transform field names and index names in all the search hits to format required to run percolate search against them. + * Hits are transformed using method transformDocumentFieldNames() */ + private fun transformSearchHitsAndReconstructDocs( + hits: SearchHits, + index: String, + concreteIndex: String, + monitorId: String, + conflictingFields: List, + docsSizeInBytes: AtomicLong, + ): List> { + return hits.mapNotNull(fun(hit: SearchHit): Pair? { + try { + val sourceMap = if (hit.hasSource()) { + hit.sourceAsMap + } else { + constructSourceMapFromFieldsInHit(hit) + } + transformDocumentFieldNames( + sourceMap, + conflictingFields, + "_${index}_$monitorId", + "_${concreteIndex}_$monitorId", + "" + ) + var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) + val sourceRef = BytesReference.bytes(xContentBuilder) + docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed()) + return Pair( + hit.id, + TransformedDocDto(index, concreteIndex, hit.id, sourceRef) + ) + } catch (e: Exception) { + log.error( + "Monitor $monitorId: Failed to transform payload $hit for percolate query", + e + ) + // skip any document which we fail to transform because we anyway won't be able to run percolate queries on them. + return null + } + }) + } + + private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap { + if (hit.fields == null) + return mutableMapOf() + val sourceMap: MutableMap = mutableMapOf() + for (field in hit.fields) { + if (field.value.values != null && field.value.values.isNotEmpty()) + if (field.value.values.size == 1) { + sourceMap[field.key] = field.value.values[0] + } else sourceMap[field.key] = field.value.values + } + return sourceMap + } + + /** + * Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names + * but different mappings & [fieldNameSuffixPattern] to field names which have unique names. + * + * Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV: + * { { + * "a": { "a": { + * "b": 1234 ----> "b_my_log_index_TReewWdsf2gdJFV": 1234 + * } } + * } + * + * @param jsonAsMap Input JSON (as Map) + * @param fieldNameSuffix Field suffix which is appended to existing field name + */ + private fun transformDocumentFieldNames( + jsonAsMap: MutableMap, + conflictingFields: List, + fieldNameSuffixPattern: String, + fieldNameSuffixIndex: String, + fieldNamePrefix: String, + ) { + val tempMap = mutableMapOf() + val it: MutableIterator> = jsonAsMap.entries.iterator() + while (it.hasNext()) { + val entry = it.next() + if (entry.value is Map<*, *>) { + transformDocumentFieldNames( + entry.value as MutableMap, + conflictingFields, + fieldNameSuffixPattern, + fieldNameSuffixIndex, + if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}" + ) + } else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) { + var alreadyReplaced = false + conflictingFields.forEach { conflictingField -> + if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) { + tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value + it.remove() + alreadyReplaced = true + } + } + if (!alreadyReplaced) { + tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value + it.remove() + } + } + } + jsonAsMap.putAll(tempMap) + } + + /** + * Returns true, if the docs fetched from shards thus far amount to less than threshold + * amount of percentage (default:10. setting is dynamic and configurable) of the total heap size or not. + * + */ + private fun isInMemoryDocsSizeExceedingMemoryLimit( + docsBytesSize: Long, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + var thresholdPercentage = + AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes + val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes + + return docsBytesSize > thresholdBytes + } + + private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit( + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + var maxNumDocsThreshold = AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + return numDocs >= maxNumDocsThreshold + } + + private suspend fun performPercolateQueryAndResetCounters( + monitorCtx: MonitorRunnerExecutionContext, + transformedDocs: MutableList>, + docsSizeInBytes: AtomicLong, + monitor: Monitor, + monitorMetadata: MonitorMetadata, + monitorInputIndices: List, + concreteIndices: List, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + percolateQueriesTimeTaken: AtomicLong, + totalDocsQueried: AtomicLong, + ) { + try { + val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( + monitorCtx, + transformedDocs, + monitor, + monitorMetadata, + concreteIndices, + monitorInputIndices, + percolateQueriesTimeTaken + ) + + percolateQueryResponseHits.forEach { hit -> + var id = hit.id + concreteIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + monitorInputIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${transformedDocs[idx].first}|${transformedDocs[idx].second.concreteIndexName}" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + } + } + totalDocsQueried.getAndAdd(transformedDocs.size.toLong()) + } finally { // no catch block because exception is caught and handled in runMonitor() class + transformedDocs.clear() + docsSizeInBytes.set(0) + } + } + + private fun shouldPerformPercolateQueryAndFlushInMemoryDocs( + docsSizeInBytes: AtomicLong, + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) || + isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) + } + + private suspend fun runForEachDocTrigger( + monitorCtx: MonitorRunnerExecutionContext, + monitorResult: MonitorRunResult, + trigger: DocumentLevelTrigger, + monitor: Monitor, + idQueryMap: Map, + docsToQueries: MutableMap>, + queryToDocIds: Map>, + dryrun: Boolean, + workflowRunContext: WorkflowRunContext?, + executionId: String, + ): DocumentLevelTriggerRunResult { + val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) + val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) + + val triggerFindingDocPairs = mutableListOf>() + + // TODO: Implement throttling for findings + val findingToDocPairs = createFindings( + monitor, + monitorCtx, + docsToQueries, + idQueryMap, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) + + findingToDocPairs.forEach { + // Only pick those entries whose docs have triggers associated with them + if (triggerResult.triggeredDocs.contains(it.second)) { + triggerFindingDocPairs.add(Pair(it.first, it.second)) + } + } + + val actionCtx = triggerCtx.copy( + triggeredDocs = triggerResult.triggeredDocs, + relatedFindings = findingToDocPairs.map { it.first }, + error = monitorResult.error ?: triggerResult.error + ) + + val alerts = mutableListOf() + triggerFindingDocPairs.forEach { + val alert = monitorCtx.alertService!!.composeDocLevelAlert( + listOf(it.first), + listOf(it.second), + triggerCtx, + monitorResult.alertError() ?: triggerResult.alertError(), + executionId = executionId, + workflorwRunContext = workflowRunContext + ) + alerts.add(alert) + } + + val shouldDefaultToPerExecution = defaultToPerExecutionAction( + monitorCtx.maxActionableAlertCount, + monitorId = monitor.id, + triggerId = trigger.id, + totalActionableAlertCount = alerts.size, + monitorOrTriggerError = actionCtx.error + ) + + for (action in trigger.actions) { + val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope + if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { + for (alert in alerts) { + val actionResults = + this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun) + triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } + triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) + } + } else if (alerts.isNotEmpty()) { + val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun) + for (alert in alerts) { + triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() } + triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults) + } + } + } + + // Alerts are saved after the actions since if there are failures in the actions, they can be stated in the alert + if (!dryrun && monitor.id != Monitor.NO_ID) { + val updatedAlerts = alerts.map { alert -> + val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap()) + val actionExecutionResults = actionResults.values.map { actionRunResult -> + ActionExecutionResult( + actionRunResult.actionId, + actionRunResult.executionTime, + if (actionRunResult.throttled) 1 else 0 + ) + } + alert.copy(actionExecutionResults = actionExecutionResults) + } + + monitorCtx.retryPolicy?.let { + monitorCtx.alertService!!.saveAlerts( + monitor.dataSources, + updatedAlerts, + it, + routingId = monitor.id + ) + } + } + return triggerResult + } + + suspend fun runAction( + action: Action, + ctx: TriggerExecutionContext, + monitorCtx: MonitorRunnerExecutionContext, + monitor: Monitor, + dryrun: Boolean, + ): ActionRunResult { + return try { + if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable( + action, + ctx.alert + ) + ) { + return ActionRunResult(action.id, action.name, mapOf(), true, null, null) + } + val actionOutput = mutableMapOf() + actionOutput[Action.SUBJECT] = if (action.subjectTemplate != null) + MonitorRunnerService.compileTemplate(action.subjectTemplate!!, ctx) + else "" + actionOutput[Action.MESSAGE] = MonitorRunnerService.compileTemplate(action.messageTemplate, ctx) + if (Strings.isNullOrEmpty(actionOutput[Action.MESSAGE])) { + throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") + } + if (!dryrun) { + val client = monitorCtx.client + client!!.threadPool().threadContext.stashContext().use { + withClosableContext( + InjectorContextElement( + monitor.id, + monitorCtx.settings!!, + monitorCtx.threadPool!!.threadContext, + monitor.user?.roles, + monitor.user + ) + ) { + actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification( + action, + monitorCtx, + actionOutput[Action.SUBJECT], + actionOutput[Action.MESSAGE]!! + ) + } + } + } + ActionRunResult(action.id, action.name, actionOutput, false, MonitorRunnerService.currentTime(), null) + } catch (e: Exception) { + ActionRunResult(action.id, action.name, mapOf(), false, MonitorRunnerService.currentTime(), e) + } + } + + protected suspend fun getConfigAndSendNotification( + action: Action, + monitorCtx: MonitorRunnerExecutionContext, + subject: String?, + message: String, + ): String { + val config = getConfigForNotificationAction(action, monitorCtx) + if (config.destination == null && config.channel == null) { + throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.destinationId}]") + } + + // Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type + // just for Alerting integration tests + if (config.destination?.isTestAction() == true) { + return "test action" + } + + if (config.destination?.isAllowed(monitorCtx.allowList) == false) { + throw IllegalStateException( + "Monitor contains a Destination type that is not allowed: ${config.destination.type}" + ) + } + + var actionResponseContent = "" + actionResponseContent = config.channel + ?.sendNotification( + monitorCtx.client!!, + config.channel.getTitle(subject), + message + ) ?: actionResponseContent + + actionResponseContent = config.destination + ?.buildLegacyBaseMessage( + subject, + message, + monitorCtx.destinationContextFactory!!.getDestinationContext(config.destination) + ) + ?.publishLegacyNotification(monitorCtx.client!!) + ?: actionResponseContent + + return actionResponseContent + } + + /** + * The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config + * depending on whether the background migration process has already migrated it from a Destination to a Notification config. + * + * To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved. + */ + private suspend fun getConfigForNotificationAction( + action: Action, + monitorCtx: MonitorRunnerExecutionContext, + ): NotificationActionConfigs { + var destination: Destination? = null + var notificationPermissionException: Exception? = null + + var channel: NotificationConfigInfo? = null + try { + channel = + NotificationApiUtils.getNotificationConfigInfo(monitorCtx.client as NodeClient, action.destinationId) + } catch (e: OpenSearchSecurityException) { + notificationPermissionException = e + } + + // If the channel was not found, try to retrieve the Destination + if (channel == null) { + destination = try { + val table = Table( + "asc", + "destination.name.keyword", + null, + 1, + 0, + null + ) + val getDestinationsRequest = GetDestinationsRequest( + action.destinationId, + 0L, + null, + table, + "ALL" + ) + + val getDestinationsResponse: GetDestinationsResponse = monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, it) + } + getDestinationsResponse.destinations.firstOrNull() + } catch (e: IllegalStateException) { + // Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned + null + } catch (e: OpenSearchSecurityException) { + if (notificationPermissionException != null) + throw notificationPermissionException + else + throw e + } + + if (destination == null && notificationPermissionException != null) + throw notificationPermissionException + } + + return NotificationActionConfigs(destination, channel) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index b0de10ff0..b1be43015 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -47,7 +47,7 @@ private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) class TransportExecuteMonitorAction @Inject constructor( - transportService: TransportService, + private val transportService: TransportService, private val client: Client, private val clusterService: ClusterService, private val runner: MonitorRunnerService, @@ -79,7 +79,7 @@ class TransportExecuteMonitorAction @Inject constructor( "Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType.name}, " + "periodStart: $periodStart, periodEnd: $periodEnd, dryrun: ${execMonitorRequest.dryrun}" ) - val monitorRunResult = runner.runJob(monitor, periodStart, periodEnd, execMonitorRequest.dryrun) + val monitorRunResult = runner.runJob(monitor, periodStart, periodEnd, execMonitorRequest.dryrun, transportService) withContext(Dispatchers.IO) { actionListener.onResponse(ExecuteMonitorResponse(monitorRunResult)) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt index 037628e9e..6c95f800e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt @@ -39,11 +39,11 @@ import java.time.Instant private val log = LogManager.getLogger(TransportExecuteWorkflowAction::class.java) class TransportExecuteWorkflowAction @Inject constructor( - transportService: TransportService, private val client: Client, private val runner: MonitorRunnerService, actionFilters: ActionFilters, - val xContentRegistry: NamedXContentRegistry + val xContentRegistry: NamedXContentRegistry, + val transportService: TransportService, ) : HandledTransportAction( ExecuteWorkflowAction.NAME, transportService, actionFilters, ::ExecuteWorkflowRequest ) { @@ -57,7 +57,7 @@ class TransportExecuteWorkflowAction @Inject constructor( val user: User? = User.parse(userStr) client.threadPool().threadContext.stashContext().use { - val executeWorkflow = fun(workflow: Workflow) { + val executeWorkflow = fun(workflow: Workflow, transportService: TransportService) { runner.launch { val (periodStart, periodEnd) = workflow.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execWorkflowRequest.requestEnd.millis)) @@ -67,7 +67,7 @@ class TransportExecuteWorkflowAction @Inject constructor( "dryrun: ${execWorkflowRequest.dryrun}" ) val workflowRunResult = - MonitorRunnerService.runJob(workflow, periodStart, periodEnd, execWorkflowRequest.dryrun) + MonitorRunnerService.runJob(workflow, periodStart, periodEnd, execWorkflowRequest.dryrun, transportService) withContext(Dispatchers.IO, { actionListener.onResponse( ExecuteWorkflowResponse( @@ -108,7 +108,7 @@ class TransportExecuteWorkflowAction @Inject constructor( response.sourceAsBytesRef, XContentType.JSON ).use { xcp -> val workflow = ScheduledJob.parse(xcp, response.id, response.version) as Workflow - executeWorkflow(workflow) + executeWorkflow(workflow, transportService) } } } @@ -125,7 +125,7 @@ class TransportExecuteWorkflowAction @Inject constructor( false -> (execWorkflowRequest.workflow as Workflow).copy(user = user) } - executeWorkflow(workflow) + executeWorkflow(workflow, transportService) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index f0253858b..7722fc93b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -39,6 +39,7 @@ import org.opensearch.index.query.QueryBuilders import org.opensearch.index.query.QueryBuilders.boolQuery import org.opensearch.index.query.QueryBuilders.existsQuery import org.opensearch.index.query.QueryBuilders.termsQuery +import org.opensearch.transport.TransportService import java.time.Instant import java.time.LocalDateTime import java.time.ZoneOffset @@ -54,6 +55,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { periodStart: Instant, periodEnd: Instant, dryRun: Boolean, + transportService: TransportService? ): WorkflowRunResult { val workflowExecutionStartTime = Instant.now() @@ -138,7 +140,16 @@ object CompositeWorkflowRunner : WorkflowRunner() { try { dataSources = delegateMonitor.dataSources val delegateRunResult = - runDelegateMonitor(delegateMonitor, monitorCtx, periodStart, periodEnd, dryRun, workflowRunContext, executionId) + runDelegateMonitor( + delegateMonitor, + monitorCtx, + periodStart, + periodEnd, + dryRun, + workflowRunContext, + executionId, + transportService + ) resultList.add(delegateRunResult!!) } catch (ex: Exception) { logger.error("Error executing workflow delegate monitor ${delegate.monitorId}", ex) @@ -242,6 +253,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { dryRun: Boolean, workflowRunContext: WorkflowRunContext, executionId: String, + transportService: TransportService?, ): MonitorRunResult<*>? { if (delegateMonitor.isBucketLevelMonitor()) { @@ -252,7 +264,8 @@ object CompositeWorkflowRunner : WorkflowRunner() { periodEnd, dryRun, workflowRunContext, - executionId + executionId, + null ) } else if (delegateMonitor.isDocLevelMonitor()) { return DocumentLevelMonitorRunner.runMonitor( @@ -262,7 +275,8 @@ object CompositeWorkflowRunner : WorkflowRunner() { periodEnd, dryRun, workflowRunContext, - executionId + executionId, + transportService ) } else if (delegateMonitor.isQueryLevelMonitor()) { return QueryLevelMonitorRunner.runMonitor( @@ -272,7 +286,8 @@ object CompositeWorkflowRunner : WorkflowRunner() { periodEnd, dryRun, workflowRunContext, - executionId + executionId, + null ) } else { throw AlertingException.wrap( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt index 14488a16a..8ebb2be9c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt @@ -5,11 +5,51 @@ package org.opensearch.alerting.workflow +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder + data class WorkflowRunContext( // In case of dry run it's random generated id, while in other cases it's workflowId val workflowId: String, val workflowMetadataId: String, val chainedMonitorId: String?, val matchingDocIdsPerIndex: Map>, - val auditDelegateMonitorAlerts: Boolean -) + val auditDelegateMonitorAlerts: Boolean, +) : Writeable, ToXContentObject { + companion object { + fun readFrom(sin: StreamInput): WorkflowRunContext { + return WorkflowRunContext(sin) + } + } + + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readString(), + sin.readOptionalString(), + sin.readMap() as Map>, + sin.readBoolean() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeString(workflowMetadataId) + out.writeOptionalString(chainedMonitorId) + out.writeMap(matchingDocIdsPerIndex) + out.writeBoolean(auditDelegateMonitorAlerts) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + .field("workflow_id", workflowId) + .field("workflow_metadata_id", workflowMetadataId) + .field("chained_monitor_id", chainedMonitorId) + .field("matching_doc_ids_per_index", matchingDocIdsPerIndex) + .field("audit_delegate_monitor_alerts", auditDelegateMonitorAlerts) + .endObject() + return builder + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt index 4b954b168..76e2c72bd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt @@ -34,6 +34,7 @@ import org.opensearch.commons.notifications.model.NotificationConfigInfo import org.opensearch.core.common.Strings import org.opensearch.script.Script import org.opensearch.script.TemplateScript +import org.opensearch.transport.TransportService import java.time.Instant abstract class WorkflowRunner { @@ -42,7 +43,8 @@ abstract class WorkflowRunner { monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, - dryRun: Boolean + dryRun: Boolean, + transportService: TransportService? ): WorkflowRunResult suspend fun runAction( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 5d33000ef..e6c318eeb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -913,7 +913,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { /** A test index that can be used across tests. Feel free to add new fields but don't remove any. */ protected fun createTestIndex(index: String = randomAlphaOfLength(10).lowercase(Locale.ROOT)): String { createIndex( - index, Settings.EMPTY, + index, Settings.builder().put("number_of_shards", 10).build(), """ "properties" : { "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 943ad11a6..a9f22071e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -17,6 +17,7 @@ import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope @@ -26,6 +27,7 @@ import org.opensearch.script.Script import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit.MILLIS +import java.time.temporal.ChronoUnit.MINUTES import java.util.Locale class DocumentMonitorRunnerIT : AlertingRestTestCase() { @@ -120,7 +122,14 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) - val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, MINUTES) + ) + ) assertNotNull(monitor.id) indexDoc(testIndex, "1", testDoc) @@ -133,6 +142,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals(monitor.name, output["monitor_name"]) @Suppress("UNCHECKED_CAST") val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") val matchingDocsToQuery = searchResult[docQuery.id] as List assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) @@ -143,8 +153,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5") || findings[1].relatedDocIds.contains("1")) } fun `test execute monitor with tag as trigger condition generates alerts and findings`() { @@ -183,8 +193,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5") || findings[1].relatedDocIds.contains("1")) } fun `test execute monitor input error`() { @@ -282,8 +292,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5") || findings[1].relatedDocIds.contains("1")) } fun `test execute monitor generates alerts and findings with per trigger execution for actions`() { @@ -345,8 +355,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5") || findings[1].relatedDocIds.contains("1")) } fun `test execute monitor with wildcard index that generates alerts and findings for EQUALS query operator`() { @@ -458,12 +468,12 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(testIndex2, "5", testDoc) executeMonitor(monitor.id) - var alerts = searchAlertsWithFilter(monitor) - assertEquals("Alert saved for test monitor", 2, alerts.size) - var findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) + var alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") } assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 2a3527dd1..f86625082 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -645,7 +645,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { .get() Assert.assertTrue(getAlertsResponse != null) Assert.assertTrue(getAlertsResponse.alerts.size == 1) - Assert.assertTrue(getAlertsResponse.alerts[0].errorMessage == "IndexClosedException[closed]") + Assert.assertTrue(getAlertsResponse.alerts[0].errorMessage!!.contains("IndexClosedException[closed]")) // Reopen index client().admin().indices().open(OpenIndexRequest(index)).get() // Close queryIndex @@ -1006,6 +1006,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), ) ) + assertIndexNotExists(SCHEDULED_JOBS_INDEX) var executeMonitorResponse = executeMonitor(monitor, null) val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt index 6851c471d..f65949e96 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt @@ -5,26 +5,100 @@ package org.opensearch.alerting.model +import org.junit.Assert +import org.junit.Test +import org.opensearch.ResourceNotFoundException +import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup -import org.opensearch.alerting.randomActionRunResult import org.opensearch.alerting.randomBucketLevelMonitorRunResult import org.opensearch.alerting.randomBucketLevelTriggerRunResult import org.opensearch.alerting.randomDocumentLevelMonitorRunResult -import org.opensearch.alerting.randomDocumentLevelTriggerRunResult import org.opensearch.alerting.randomEmailAccount import org.opensearch.alerting.randomEmailGroup import org.opensearch.alerting.randomInputRunResults import org.opensearch.alerting.randomQueryLevelMonitorRunResult import org.opensearch.alerting.randomQueryLevelTriggerRunResult +import org.opensearch.common.UUIDs import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.OpenSearchTestCase +import java.time.Instant class WriteableTests : OpenSearchTestCase() { + @Test + fun `test DocumentLevelTriggerRunResult as stream`() { + val workflow = randomDocumentLevelTriggerRunResult() + val out = BytesStreamOutput() + workflow.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newWorkflow = DocumentLevelTriggerRunResult(sin) + Assert.assertEquals("Round tripping dltrr failed", newWorkflow, workflow) + } + + @Test + fun `test dlmfor as stream`() { + val workflow = DocLevelMonitorFanOutResponse( + "nodeid", + "eid", + "monitorId", + mapOf("test" to ResourceNotFoundException("test")), + listOf("f1", "f2"), + mapOf("index" to mapOf("1" to "1")) as MutableMap, + InputRunResults(), + mapOf("1" to randomDocumentLevelTriggerRunResult(), "2" to randomDocumentLevelTriggerRunResult()) + ) + val out = BytesStreamOutput() + workflow.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newWorkflow = DocumentLevelTriggerRunResult(sin) + Assert.assertEquals("Round tripping dltrr failed", newWorkflow, workflow) + } + + @Test + fun `test dlmfor1 as stream`() { + val workflow = DocLevelMonitorFanOutResponse( + "nodeid", + "eid", + "monitorId", + mapOf(), + listOf("f1", "f2"), + mapOf("index" to mapOf("1" to "1")) as MutableMap, + InputRunResults(), + mapOf("1" to randomDocumentLevelTriggerRunResult(), "2" to randomDocumentLevelTriggerRunResult()) + ) + val out = BytesStreamOutput() + workflow.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newWorkflow = DocLevelMonitorFanOutResponse(sin) + Assert.assertEquals("Round tripping dltrr failed", newWorkflow, workflow) + } + + fun randomDocumentLevelTriggerRunResult(): DocumentLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + return DocumentLevelTriggerRunResult( + "trigger-name", + mutableListOf(UUIDs.randomBase64UUID().toString()), + null, + mutableMapOf(Pair("alertId", map)) + ) + } + + fun randomActionRunResult(): ActionRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", "val1")) + map.plus(Pair("key2", "val2")) + return ActionRunResult( + "1234", "test-action", map, + false, Instant.now(), null + ) + } + fun `test actionrunresult as stream`() { val actionRunResult = randomActionRunResult() val out = BytesStreamOutput() diff --git a/core/build.gradle b/core/build.gradle index 7aeb8c284..19ddd2898 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -20,6 +20,7 @@ dependencies { } implementation 'com.google.guava:guava:32.0.1-jre' api files("/Users/snistala/Documents/opensearch/common-utils/build/libs/common-utils-3.0.0.0-SNAPSHOT.jar") +// api "org.opensearch:common-utils:${common_utils_version}@jar" implementation 'commons-validator:commons-validator:1.7' testImplementation "org.opensearch.test:framework:${opensearch_version}" diff --git a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt index 3e87f207f..514eebdfe 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt @@ -31,6 +31,7 @@ import org.opensearch.core.xcontent.ToXContent import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.transport.TransportService import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException @@ -156,6 +157,20 @@ suspend fun C.suspendUntil(block: C.(ActionListener }) } +/** + * Converts [OpenSearchClient] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the OpenSearch client API. + */ +suspend fun C.suspendUntil(block: C.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + /** * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function. *