diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 9a9c75f32..27634af09 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -6,15 +6,14 @@ package org.opensearch.alerting import org.opensearch.action.ActionRequest -import org.opensearch.action.ActionResponse import org.opensearch.alerting.action.ExecuteMonitorAction +import org.opensearch.alerting.action.ExecuteWorkflowAction import org.opensearch.alerting.action.GetDestinationsAction import org.opensearch.alerting.action.GetEmailAccountAction import org.opensearch.alerting.action.GetEmailGroupAction -import org.opensearch.alerting.action.GetMonitorAction +import org.opensearch.alerting.action.GetRemoteIndexesAction import org.opensearch.alerting.action.SearchEmailAccountAction import org.opensearch.alerting.action.SearchEmailGroupAction -import org.opensearch.alerting.action.SearchMonitorAction import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices @@ -33,25 +32,37 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction import org.opensearch.alerting.resthandler.RestGetEmailGroupAction import org.opensearch.alerting.resthandler.RestGetFindingsAction import org.opensearch.alerting.resthandler.RestGetMonitorAction +import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction +import org.opensearch.alerting.resthandler.RestGetWorkflowAction +import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction import org.opensearch.alerting.resthandler.RestIndexMonitorAction +import org.opensearch.alerting.resthandler.RestIndexWorkflowAction import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction import org.opensearch.alerting.resthandler.RestSearchMonitorAction import org.opensearch.alerting.script.TriggerScript +import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction +import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction import org.opensearch.alerting.transport.TransportDeleteMonitorAction +import org.opensearch.alerting.transport.TransportDeleteWorkflowAction import org.opensearch.alerting.transport.TransportExecuteMonitorAction +import org.opensearch.alerting.transport.TransportExecuteWorkflowAction import org.opensearch.alerting.transport.TransportGetAlertsAction import org.opensearch.alerting.transport.TransportGetDestinationsAction import org.opensearch.alerting.transport.TransportGetEmailAccountAction import org.opensearch.alerting.transport.TransportGetEmailGroupAction import org.opensearch.alerting.transport.TransportGetFindingsSearchAction import org.opensearch.alerting.transport.TransportGetMonitorAction +import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction +import org.opensearch.alerting.transport.TransportGetWorkflowAction +import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction import org.opensearch.alerting.transport.TransportIndexMonitorAction +import org.opensearch.alerting.transport.TransportIndexWorkflowAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction @@ -61,8 +72,6 @@ import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.io.stream.NamedWriteableRegistry -import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.IndexScopedSettings import org.opensearch.common.settings.Setting @@ -83,6 +92,9 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule +import org.opensearch.monitor.jvm.JvmStats +import org.opensearch.painless.spi.Allowlist +import org.opensearch.painless.spi.AllowlistLoader import org.opensearch.painless.spi.PainlessExtension import org.opensearch.painless.spi.Whitelist import org.opensearch.painless.spi.WhitelistLoader @@ -237,6 +249,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerTriggerService(TriggerService(scriptService)) .registerAlertService(AlertService(client, xContentRegistry, alertIndices)) .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) + .registerJvmStats(JvmStats.jvmStats()) + .registerWorkflowService(WorkflowService(client, xContentRegistry)) .registerConsumers() .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) @@ -284,6 +298,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.ALERT_HISTORY_MAX_DOCS, AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, AlertingSettings.ALERTING_MAX_MONITORS, + AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY, AlertingSettings.REQUEST_TIMEOUT, AlertingSettings.MAX_ACTION_THROTTLE_VALUE, AlertingSettings.FILTER_BY_BACKEND_ROLES, @@ -304,6 +320,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE, LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES, + AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED, DestinationSettings.EMAIL_USERNAME, DestinationSettings.EMAIL_PASSWORD, DestinationSettings.ALLOW_LIST, @@ -316,7 +333,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_MAX_DOCS, AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, - AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD + AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, + AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 5993dd929..a7d19a7bf 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -8,13 +8,15 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException -import org.opensearch.action.ActionListener +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.admin.indices.refresh.RefreshAction +import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse -import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults @@ -27,12 +29,13 @@ import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.AlertingPluginInterface @@ -47,30 +50,46 @@ import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.util.string +import org.opensearch.core.action.ActionListener +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders import org.opensearch.percolator.PercolateQueryBuilderExt -import org.opensearch.rest.RestStatus +import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID +import java.util.stream.Collectors import kotlin.math.max -object DocumentLevelMonitorRunner : MonitorRunner() { +class DocumentLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) + var nonPercolateSearchesTimeTakenStat = 0L + var percolateQueriesTimeTakenStat = 0L + var totalDocsQueriedStat = 0L + var docTransformTimeTakenStat = 0L + var totalDocsSizeInBytesStat = 0L + var docsSizeOfBatchInBytes = 0L + /* Contains list of docs source that are held in memory to submit to percolate query against query index. + * Docs are fetched from the source index per shard and transformed.*/ + val transformedDocs = mutableListOf>() override suspend fun runMonitor( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, - dryrun: Boolean + dryrun: Boolean, + workflowRunContext: WorkflowRunContext?, + executionId: String ): MonitorRunResult { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID @@ -96,7 +115,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata( monitor = monitor, createWithRunContext = false, - skipIndex = isTempMonitor + skipIndex = isTempMonitor, + workflowRunContext?.workflowMetadataId ) val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput @@ -114,11 +134,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { // Resolve all passed indices to concrete indices - val indices = IndexUtils.resolveAllIndices( + val allConcreteIndices = IndexUtils.resolveAllIndices( docLevelMonitorInput.indices, monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) + if (allConcreteIndices.isEmpty()) { + logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}") + throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(",")) + } monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( @@ -129,69 +153,142 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) // cleanup old indices that are not monitored anymore from the same monitor - for (ind in updatedLastRunContext.keys) { - if (!indices.contains(ind)) { + val runContextKeys = updatedLastRunContext.keys.toMutableSet() + for (ind in runContextKeys) { + if (!allConcreteIndices.contains(ind)) { updatedLastRunContext.remove(ind) } } - indices.forEach { indexName -> - // Prepare lastRunContext for each index - val indexLastRunContext = lastRunContext.getOrPut(indexName) { - val isIndexCreatedRecently = createdRecently( - monitor, - periodStart, - periodEnd, - monitorCtx.clusterService!!.state().metadata.index(indexName) - ) - MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently) - } + // Map of document ids per index when monitor is workflow delegate and has chained findings + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex - // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( - indexLastRunContext.toMutableMap(), - monitorCtx, - indexName - ) as MutableMap - updatedLastRunContext[indexName] = indexUpdatedRunContext - - val count: Int = indexLastRunContext["shards_count"] as Int - for (i: Int in 0 until count) { - val shard = i.toString() - - // update lastRunContext if its a temp monitor as we only want to view the last bit of data then - // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data - if (isTempMonitor) { - indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + val concreteIndicesSeenSoFar = mutableListOf() + val updatedIndexNames = mutableListOf() + docLevelMonitorInput.indices.forEach { indexName -> + var concreteIndices = IndexUtils.resolveAllIndices( + listOf(indexName), + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + var lastWriteIndex: String? = null + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) } + if (lastWriteIndex != null) { + val lastWriteIndexCreationDate = + IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + concreteIndices = IndexUtils.getNewestIndicesByCreationDate( + concreteIndices, + monitorCtx.clusterService!!.state(), + lastWriteIndexCreationDate + ) } } + concreteIndicesSeenSoFar.addAll(concreteIndices) + val updatedIndexName = indexName.replace("*", "_") + updatedIndexNames.add(updatedIndexName) + val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( + monitorCtx.clusterService!!.state(), + concreteIndices + ) - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - - val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName) + concreteIndices.forEach { concreteIndexName -> + // Prepare lastRunContext for each index + val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { + val isIndexCreatedRecently = createdRecently( + monitor, + periodStart, + periodEnd, + monitorCtx.clusterService!!.state().metadata.index(concreteIndexName) + ) + MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently) + } - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( + // Prepare updatedLastRunContext for each index + val indexUpdatedRunContext = updateLastRunContext( + indexLastRunContext.toMutableMap(), monitorCtx, - matchingDocs.map { it.second }, - monitor, - monitorMetadata, - indexName - ) + concreteIndexName, + ) as MutableMap + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) { + updatedLastRunContext.remove(lastWriteIndex) + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } + } else { + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } + + val count: Int = indexLastRunContext["shards_count"] as Int + for (i: Int in 0 until count) { + val shard = i.toString() - matchedQueriesForDocs.forEach { hit -> - val id = hit.id.replace("_${indexName}_${monitor.id}", "") + // update lastRunContext if its a temp monitor as we only want to view the last bit of data then + // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data + if (isTempMonitor) { + indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + } + } - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${matchingDocs[idx].first}|$indexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + val fieldsToBeQueried = mutableSetOf() + if (monitorCtx.fetchOnlyQueryFieldNames) { + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + logger.debug( + "Monitor ${monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) } + if (fieldsToBeQueried.isNotEmpty()) + logger.debug( + "Monitor ${monitor.id} Querying only fields " + + "${fieldsToBeQueried.joinToString()} instead of entire _source of documents" + ) } + + // Prepare DocumentExecutionContext for each index + val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) + + fetchShardDataAndMaybeExecutePercolateQueries( + monitor, + monitorCtx, + docExecutionContext, + updatedIndexName, + concreteIndexName, + conflictingFields.toList(), + matchingDocIdsPerIndex?.get(concreteIndexName), + monitorMetadata, + inputRunResults, + docsToQueries, + updatedIndexNames, + concreteIndicesSeenSoFar, + ArrayList(fieldsToBeQueried) + ) } } + /* if all indices are covered still in-memory docs size limit is not breached we would need to submit + the percolate query at the end */ + if (transformedDocs.isNotEmpty()) { + performPercolateQueryAndResetCounters( + monitorCtx, + monitor, + monitorMetadata, + updatedIndexNames, + concreteIndicesSeenSoFar, + inputRunResults, + docsToQueries, + ) + } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -211,10 +308,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) - } + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) } } else { monitor.triggers.forEach { @@ -226,7 +320,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { idQueryMap, docsToQueries, queryToDocIds, - dryrun + dryrun, + executionId = executionId, + workflowRunContext = workflowRunContext ) } } @@ -235,7 +331,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If any error happened during trigger execution, upsert monitor error alert val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) if (errorMessage.isNotEmpty()) { - monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage) + monitorCtx.alertService!!.upsertMonitorErrorAlert( + monitor = monitor, + errorMessage = errorMessage, + executionId = executionId, + workflowRunContext + ) } else { onSuccessfulMonitorRun(monitorCtx, monitor) } @@ -250,7 +351,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return monitorResult.copy(triggerResults = triggerResults) } catch (e: Exception) { val errorMessage = ExceptionsHelper.detailedMessage(e) - monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage) + monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext) logger.error("Failed running Document-level-monitor ${monitor.name}", e) val alertingException = AlertingException( errorMessage, @@ -258,6 +359,22 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + } finally { + logger.debug( + "PERF_DEBUG_STATS: Monitor ${monitor.id} " + + "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTakenStat" + ) + logger.debug( + "PERF_DEBUG_STATS: Monitor {} Time spent on percolate queries in millis: {}", + monitor.id, + percolateQueriesTimeTakenStat + ) + logger.debug( + "PERF_DEBUG_STATS: Monitor {} Time spent on transforming doc fields in millis: {}", + monitor.id, + docTransformTimeTakenStat + ) + logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueriedStat) } } @@ -296,40 +413,49 @@ object DocumentLevelMonitorRunner : MonitorRunner() { trigger: DocumentLevelTrigger, monitor: Monitor, idQueryMap: Map, - docsToQueries: Map>, + docsToQueries: MutableMap>, queryToDocIds: Map>, - dryrun: Boolean + dryrun: Boolean, + workflowRunContext: WorkflowRunContext?, + executionId: String ): DocumentLevelTriggerRunResult { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) - val findings = mutableListOf() - val findingDocPairs = mutableListOf>() + val triggerFindingDocPairs = mutableListOf>() // TODO: Implement throttling for findings - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key, !dryrun && monitor.id != Monitor.NO_ID) - findings.add(findingId) + val findingToDocPairs = createFindings( + monitor, + monitorCtx, + docsToQueries, + idQueryMap, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) - if (triggerResult.triggeredDocs.contains(it.key)) { - findingDocPairs.add(Pair(findingId, it.key)) + findingToDocPairs.forEach { + // Only pick those entries whose docs have triggers associated with them + if (triggerResult.triggeredDocs.contains(it.second)) { + triggerFindingDocPairs.add(Pair(it.first, it.second)) } } val actionCtx = triggerCtx.copy( triggeredDocs = triggerResult.triggeredDocs, - relatedFindings = findings, + relatedFindings = findingToDocPairs.map { it.first }, error = monitorResult.error ?: triggerResult.error ) val alerts = mutableListOf() - findingDocPairs.forEach { + triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), listOf(it.second), triggerCtx, - monitorResult.alertError() ?: triggerResult.alertError() + monitorResult.alertError() ?: triggerResult.alertError(), + executionId = executionId, + workflorwRunContext = workflowRunContext ) alerts.add(alert) } @@ -369,54 +495,104 @@ object DocumentLevelMonitorRunner : MonitorRunner() { alert.copy(actionExecutionResults = actionExecutionResults) } - monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(monitor.dataSources, updatedAlerts, it) } + monitorCtx.retryPolicy?.let { + monitorCtx.alertService!!.saveAlerts( + monitor.dataSources, + updatedAlerts, + it, + routingId = monitor.id + ) + } } return triggerResult } + /** + * 1. Bulk index all findings based on shouldCreateFinding flag + * 2. invoke publishFinding() to kickstart auto-correlations + * 3. Returns a list of pairs for finding id to doc id + */ private suspend fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docLevelQueries: List, - matchingDocId: String, - shouldCreateFinding: Boolean - ): String { - // Before the "|" is the doc id and after the "|" is the index - val docIndex = matchingDocId.split("|") + docsToQueries: MutableMap>, + idQueryMap: Map, + shouldCreateFinding: Boolean, + workflowExecutionId: String? = null, + ): List> { - val finding = Finding( - id = UUID.randomUUID().toString(), - relatedDocIds = listOf(docIndex[0]), - correlatedDocIds = listOf(docIndex[0]), - monitorId = monitor.id, - monitorName = monitor.name, - index = docIndex[1], - docLevelQueries = docLevelQueries, - timestamp = Instant.now() - ) + val findingDocPairs = mutableListOf>() + val findings = mutableListOf() + val indexRequests = mutableListOf() - val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - logger.debug("Findings: $findingStr") + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - if (shouldCreateFinding) { - val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) + // Before the "|" is the doc id and after the "|" is the index + val docIndex = it.key.split("|") - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.index(indexRequest, it) + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = listOf(docIndex[0]), + correlatedDocIds = listOf(docIndex[0]), + monitorId = monitor.id, + monitorName = monitor.name, + index = docIndex[1], + docLevelQueries = triggeredQueries, + timestamp = Instant.now(), + executionId = workflowExecutionId + ) + findingDocPairs.add(Pair(finding.id, it.key)) + findings.add(finding) + + val findingStr = + finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) + .string() + logger.debug("Findings: $findingStr") + + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .opType(DocWriteRequest.OpType.CREATE) } } + if (indexRequests.isNotEmpty()) { + bulkIndexFindings(monitor, monitorCtx, indexRequests) + } + try { - publishFinding(monitor, monitorCtx, finding) + findings.forEach { finding -> + publishFinding(monitor, monitorCtx, finding) + } } catch (e: Exception) { // suppress exception logger.error("Optional finding callback failed", e) } - return finding.id + return findingDocPairs + } + + private suspend fun bulkIndexFindings( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + indexRequests: List + ) { + indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { + bulk(BulkRequest().add(batch), it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + } + } + } else { + logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") + } + } + monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex)) } private fun publishFinding( @@ -439,7 +615,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun updateLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, - index: String + index: String, ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() @@ -498,8 +674,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } - if (response.hits.hits.isEmpty()) + nonPercolateSearchesTimeTakenStat += response.took.millis + if (response.hits.hits.isEmpty()) { return -1L + } return response.hits.hits[0].seqNo } @@ -509,14 +687,27 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return allShards.filter { it.primary() }.size } - private suspend fun getMatchingDocs( + /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. + * needs to be converted to scroll if not performant enough) + * 2. Transform documents to conform to format required for percolate query + * 3a. Check if docs in memory are crossing threshold defined by setting. + * 3b. If yes, perform percolate query and update docToQueries Map with all hits from percolate queries */ + private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, - index: String - ): List> { + indexName: String, + concreteIndexName: String, + conflictingFields: List, + docIds: List? = null, + monitorMetadata: MonitorMetadata, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + monitorInputIndices: List, + concreteIndices: List, + fieldsToBeQueried: List, + ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int - val matchingDocs = mutableListOf>() for (i: Int in 0 until count) { val shard = i.toString() try { @@ -525,30 +716,104 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val hits: SearchHits = searchShard( monitorCtx, - index, + concreteIndexName, shard, prevSeqNo, maxSeqNo, - null + docIds, + fieldsToBeQueried + ) + val startTime = System.currentTimeMillis() + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexName, + concreteIndexName, + monitor.id, + conflictingFields, + ) ) + docTransformTimeTakenStat += System.currentTimeMillis() - startTime + } catch (e: Exception) { + logger.error( + "Monitor ${monitor.id} :" + + " Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}", + e + ) + } + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx) + ) { + performPercolateQueryAndResetCounters( + monitorCtx, + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries, + ) + } + } + } + + private fun shouldPerformPercolateQueryAndFlushInMemoryDocs( + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeOfBatchInBytes, monitorCtx) || + isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) + } - if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, index, monitor.id)) + private suspend fun performPercolateQueryAndResetCounters( + monitorCtx: MonitorRunnerExecutionContext, + monitor: Monitor, + monitorMetadata: MonitorMetadata, + monitorInputIndices: List, + concreteIndices: List, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + ) { + try { + val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( + monitorCtx, + transformedDocs, + monitor, + monitorMetadata, + concreteIndices, + monitorInputIndices, + ) + + percolateQueryResponseHits.forEach { hit -> + var id = hit.id + concreteIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + monitorInputIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${transformedDocs[idx].first}|${transformedDocs[idx].second.concreteIndexName}" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } - } catch (e: Exception) { - logger.warn("Failed to run for shard $shard. Error: ${e.message}") } + totalDocsQueriedStat += transformedDocs.size.toLong() + } finally { + transformedDocs.clear() + docsSizeOfBatchInBytes = 0 } - return matchingDocs } + /** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo. + * This method hence fetches only docs from shard which haven't been queried before + */ private suspend fun searchShard( monitorCtx: MonitorRunnerExecutionContext, index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String? + docIds: List? = null, + fieldsToFetch: List, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -556,8 +821,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (query != null) { - boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) + if (!docIds.isNullOrEmpty()) { + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } val request: SearchRequest = SearchRequest() @@ -567,44 +832,64 @@ object DocumentLevelMonitorRunner : MonitorRunner() { SearchSourceBuilder() .version(true) .query(boolQueryBuilder) - .size(10000) // fixme: make this configurable. + .size(10000) ) + .preference(Preference.PRIMARY_FIRST.type()) + + if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) { + request.source().fetchSource(false) + for (field in fieldsToFetch) { + request.source().fetchField(field) + } + } val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search shard: $shard") + logger.error("Failed search shard. Response: $response") + throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } + nonPercolateSearchesTimeTakenStat += response.took.millis return response.hits } - private suspend fun getMatchedQueries( + /** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/ + private suspend fun runPercolateQueryOnTransformedDocs( monitorCtx: MonitorRunnerExecutionContext, - docs: List, + docs: MutableList>, monitor: Monitor, monitorMetadata: MonitorMetadata, - index: String + concreteIndices: List, + monitorInputIndices: List, ): SearchHits { - val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) - - val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) + val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) + val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) + val percolateQueryBuilder = + PercolateQueryBuilderExt("query", docs.map { it.second.docSource }, XContentType.JSON) if (monitor.id.isNotEmpty()) { boolQueryBuilder.must(QueryBuilders.matchQuery("monitor_id", monitor.id).operator(Operator.AND)) } boolQueryBuilder.filter(percolateQueryBuilder) - - val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id] - if (queryIndex == null) { - val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" + - " sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}" + val queryIndices = + docs.map { monitorMetadata.sourceToQueryIndexMapping[it.second.indexName + monitor.id] }.distinct() + if (queryIndices.isEmpty()) { + val message = + "Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" + + " sourceIndices: $monitorInputIndices" logger.error(message) throw AlertingException.wrap( OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) } - val searchRequest = SearchRequest(queryIndex) + + val searchRequest = + SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) - + logger.debug( + "Monitor ${monitor.id}: " + + "Executing percolate query for docs from source indices " + + "$monitorInputIndices against query index $queryIndices" + ) var response: SearchResponse try { response = monitorCtx.client!!.suspendUntil { @@ -612,34 +897,82 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } catch (e: Exception) { throw IllegalStateException( - "Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e + "Monitor ${monitor.id}:" + + " Failed to run percolate search for sourceIndex [${concreteIndices.joinToString()}] " + + "and queryIndex [${queryIndices.joinToString()}] for ${docs.size} document(s)", + e ) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search percolate index: $queryIndex") + throw IOException( + "Monitor ${monitor.id}: Failed to search percolate index: ${queryIndices.joinToString()}. " + + "Response status is ${response.status()}" + ) } + logger.debug("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + percolateQueriesTimeTakenStat += response.took.millis return response.hits } + /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ + private fun buildShouldClausesOverPerIndexMatchQueries(indices: List): BoolQueryBuilder { + val boolQueryBuilder = QueryBuilders.boolQuery() + indices.forEach { boolQueryBuilder.should(QueryBuilders.matchQuery("index", it)) } + return boolQueryBuilder + } - private fun getAllDocs(hits: SearchHits, index: String, monitorId: String): List> { - return hits.map { hit -> - val sourceMap = hit.sourceAsMap - - transformDocumentFieldNames(sourceMap, "_${index}_$monitorId") - - var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) - - val sourceRef = BytesReference.bytes(xContentBuilder) - - logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString()) + /** Transform field names and index names in all the search hits to format required to run percolate search against them. + * Hits are transformed using method transformDocumentFieldNames() */ + private fun transformSearchHitsAndReconstructDocs( + hits: SearchHits, + index: String, + concreteIndex: String, + monitorId: String, + conflictingFields: List, + ): List> { + return hits.mapNotNull(fun(hit: SearchHit): Pair? { + try { + val sourceMap = if (hit.hasSource()) { + hit.sourceAsMap + } else { + constructSourceMapFromFieldsInHit(hit) + } + transformDocumentFieldNames( + sourceMap, + conflictingFields, + "_${index}_$monitorId", + "_${concreteIndex}_$monitorId", + "" + ) + var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) + val sourceRef = BytesReference.bytes(xContentBuilder) + docsSizeOfBatchInBytes += sourceRef.ramBytesUsed() + totalDocsSizeInBytesStat += sourceRef.ramBytesUsed() + return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef)) + } catch (e: Exception) { + logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) + // skip any document which we fail to transform because we anyway won't be able to run percolate queries on them. + return null + } + }) + } - Pair(hit.id, sourceRef) + private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap { + if (hit.fields == null) + return mutableMapOf() + val sourceMap: MutableMap = mutableMapOf() + for (field in hit.fields) { + if (field.value.values != null && field.value.values.isNotEmpty()) + if (field.value.values.size == 1) { + sourceMap[field.key] = field.value.values[0] + } else sourceMap[field.key] = field.value.values } + return sourceMap } /** - * Traverses document fields in leaves recursively and appends [fieldNameSuffix] to field names. + * Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names + * but different mappings & [fieldNameSuffixPattern] to field names which have unique names. * * Example for index name is my_log_index and Monitor ID is TReewWdsf2gdJFV: * { { @@ -653,19 +986,67 @@ object DocumentLevelMonitorRunner : MonitorRunner() { */ private fun transformDocumentFieldNames( jsonAsMap: MutableMap, - fieldNameSuffix: String + conflictingFields: List, + fieldNameSuffixPattern: String, + fieldNameSuffixIndex: String, + fieldNamePrefix: String ) { val tempMap = mutableMapOf() val it: MutableIterator> = jsonAsMap.entries.iterator() while (it.hasNext()) { val entry = it.next() if (entry.value is Map<*, *>) { - transformDocumentFieldNames(entry.value as MutableMap, fieldNameSuffix) - } else if (entry.key.endsWith(fieldNameSuffix) == false) { - tempMap["${entry.key}$fieldNameSuffix"] = entry.value - it.remove() + transformDocumentFieldNames( + entry.value as MutableMap, + conflictingFields, + fieldNameSuffixPattern, + fieldNameSuffixIndex, + if (fieldNamePrefix == "") entry.key else "$fieldNamePrefix.${entry.key}" + ) + } else if (!entry.key.endsWith(fieldNameSuffixPattern) && !entry.key.endsWith(fieldNameSuffixIndex)) { + var alreadyReplaced = false + conflictingFields.forEach { conflictingField -> + if (conflictingField == "$fieldNamePrefix.${entry.key}" || (fieldNamePrefix == "" && conflictingField == entry.key)) { + tempMap["${entry.key}$fieldNameSuffixIndex"] = entry.value + it.remove() + alreadyReplaced = true + } + } + if (!alreadyReplaced) { + tempMap["${entry.key}$fieldNameSuffixPattern"] = entry.value + it.remove() + } } } jsonAsMap.putAll(tempMap) } + + /** + * Returns true, if the docs fetched from shards thus far amount to less than threshold + * amount of percentage (default:10. setting is dynamic and configurable) of the total heap size or not. + * + */ + private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit + val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes + val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes + + return docsBytesSize > thresholdBytes + } + + private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory + return numDocs >= maxNumDocsThreshold + } + + /** + * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name + * and doc source. A list of these POJOs would be passed to percolate query execution logic. + */ + data class TransformedDocDto( + var indexName: String, + var concreteIndexName: String, + var docId: String, + var docSource: BytesReference + ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 2a6a57642..7ecf1a85d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IndexUtils import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings @@ -206,11 +207,19 @@ object MonitorMetadataService : val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf() try { if (index == null) return mutableMapOf() - val getIndexRequest = GetIndexRequest().indices(index) - val getIndexResponse: GetIndexResponse = client.suspendUntil { - client.admin().indices().getIndex(getIndexRequest, it) + + val indices = mutableListOf() + if (IndexUtils.isAlias(index, clusterService.state()) || + IndexUtils.isDataStream(index, clusterService.state()) + ) { + IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) } + } else { + val getIndexRequest = GetIndexRequest().indices(index) + val getIndexResponse: GetIndexResponse = client.suspendUntil { + client.admin().indices().getIndex(getIndexRequest, it) + } + indices.addAll(getIndexResponse.indices()) } - val indices = getIndexResponse.indices() indices.forEach { indexName -> if (!lastRunContext.containsKey(indexName)) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 67fa29877..043ae88d4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -18,6 +18,7 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService import org.opensearch.threadpool.ThreadPool @@ -35,6 +36,8 @@ data class MonitorRunnerExecutionContext( var triggerService: TriggerService? = null, var alertService: AlertService? = null, var docLevelMonitorQueries: DocLevelMonitorQueries? = null, + var workflowService: WorkflowService? = null, + var jvmStats: JvmStats? = null, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, @@ -46,5 +49,10 @@ data class MonitorRunnerExecutionContext( @Volatile var destinationContextFactory: DestinationContextFactory? = null, @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, - @Volatile var indexTimeout: TimeValue? = null + @Volatile var indexTimeout: TimeValue? = null, + @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + @Volatile var fetchOnlyQueryFieldNames: Boolean = true, + @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, + @Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = + AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index e7b2f007f..b827d3d13 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -22,12 +22,17 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings @@ -45,6 +50,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.util.isBucketLevelMonitor import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript @@ -121,6 +127,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerJvmStats(jvmStats: JvmStats): MonitorRunnerService { + this.monitorCtx.jvmStats = jvmStats + return this + } + // Must be called after registerClusterService and registerSettings in AlertingPlugin fun registerConsumers(): MonitorRunnerService { monitorCtx.retryPolicy = BackoffPolicy.constantBackoff( @@ -156,6 +167,28 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings) + monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) { + monitorCtx.findingsIndexBatchSize = it + } + + monitorCtx.fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED) { + monitorCtx.fetchOnlyQueryFieldNames = it + } + + monitorCtx.percQueryMaxNumDocsInMemory = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) { + monitorCtx.percQueryMaxNumDocsInMemory = it + } + + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = + PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings + .addSettingsUpdateConsumer(PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) { + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it + } + return this } @@ -218,11 +251,28 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon } override fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant) { - if (job !is Monitor) { - throw IllegalArgumentException("Invalid job type") - } - launch { - runJob(job, periodStart, periodEnd, false) + when (job) { + is Workflow -> { + launch { + logger.debug( + "PERF_DEBUG: executing workflow ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) + runJob(job, periodStart, periodEnd, false) + } + } + is Monitor -> { + launch { + logger.debug( + "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) + runJob(job, periodStart, periodEnd, false) + } + } + else -> { + throw IllegalArgumentException("Invalid job type") + } } } @@ -248,7 +298,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon val runResult = if (monitor.isBucketLevelMonitor()) { BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun) } else if (monitor.isDocLevelMonitor()) { - DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun) + DocumentLevelMonitorRunner().runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } else { QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 7dd90b106..6f822006c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -17,6 +17,9 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L + const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 + const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000 + const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -25,6 +28,39 @@ class AlertingSettings { Setting.Property.Dynamic ) + /** Defines the threshold percentage of heap size in bytes till which we accumulate docs in memory before we query against percolate query + * index in document level monitor execution. + */ + val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit", + 10, + 0, + 100, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document + * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the + * monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate + * query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current + * execution + */ + val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_max_num_docs_in_memory", + DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, 1000, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** + * Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries. + * Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards. + */ + val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting( + "plugins.alerting.monitor.doc_level_monitor_query_field_names_enabled", + true, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val INPUT_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.input_timeout", LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, @@ -176,5 +212,12 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( + "plugins.alerting.alert_findings_indexing_batch_size", + DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + 1, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 9b008a081..3b586b759 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.MonitorRunnerService.monitorCtx import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client +import org.opensearch.cluster.ClusterState import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -155,8 +156,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ fun traverseMappingsAndUpdate( node: MutableMap, currentPath: String, - processLeafFn: (String, MutableMap) -> Triple>, - flattenPaths: MutableList + processLeafFn: (String, String, MutableMap) -> Triple>, + flattenPaths: MutableMap> ) { // If node contains "properties" property then it is internal(non-leaf) node log.debug("Node in traverse: $node") @@ -170,10 +171,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // If it has type property and type is not "nested" then this is a leaf if (nodeProps.containsKey(TYPE) && nodeProps[TYPE] != NESTED) { // At this point we know full path of node, so we add it to output array - flattenPaths.add(fullPath) + flattenPaths.put(fullPath, nodeProps) // Calls processLeafFn and gets old node name, new node name and new properties of node. // This is all information we need to update this node - val (oldName, newName, props) = processLeafFn(it.key, it.value as MutableMap) + val (oldName, newName, props) = processLeafFn(it.key, fullPath, it.value as MutableMap) newNodes.add(Triple(oldName, newName, props)) } else { // Internal(non-leaf) node - visit children @@ -201,59 +202,119 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput val queries: List = docLevelMonitorInput.queries - val indices = IndexUtils.resolveAllIndices( - docLevelMonitorInput.indices, - monitorCtx.clusterService!!, - monitorCtx.indexNameExpressionResolver!! - ) - + val indices = docLevelMonitorInput.indices val clusterState = clusterService.state() // Run through each backing index and apply appropriate mappings to query index - indices?.forEach { indexName -> - if (clusterState.routingTable.hasIndex(indexName)) { - val indexMetadata = clusterState.metadata.index(indexName) - if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { - val properties = ( - (indexMetadata.mapping()?.sourceAsMap?.get("properties")) - as MutableMap - ) - // Node processor function is used to process leaves of index mappings tree - // - val leafNodeProcessor = - fun(fieldName: String, props: MutableMap): Triple> { - val newProps = props.toMutableMap() - if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { - val mappingsByType = monitor.dataSources.queryIndexMappingsByType - if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { - mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> - newProps[iter.key] = iter.value + indices.forEach { indexName -> + var concreteIndices = IndexUtils.resolveAllIndices( + listOf(indexName), + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) } + if (lastWriteIndex != null) { + val lastWriteIndexCreationDate = + IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + concreteIndices = IndexUtils.getNewestIndicesByCreationDate( + concreteIndices, + monitorCtx.clusterService!!.state(), + lastWriteIndexCreationDate + ) + } + } + val updatedIndexName = indexName.replace("*", "_") + val updatedProperties = mutableMapOf() + val allFlattenPaths = mutableSetOf>() + var sourceIndexFieldLimit = 0L + val conflictingFields = getAllConflictingFields(clusterState, concreteIndices) + + concreteIndices.forEach { concreteIndexName -> + if (clusterState.routingTable.hasIndex(concreteIndexName)) { + val indexMetadata = clusterState.metadata.index(concreteIndexName) + if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { + val properties = ( + (indexMetadata.mapping()?.sourceAsMap?.get("properties")) + as MutableMap + ) + // Node processor function is used to process leaves of index mappings tree + // + val leafNodeProcessor = + fun(fieldName: String, fullPath: String, props: MutableMap): + Triple> { + val newProps = props.toMutableMap() + if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { + val mappingsByType = monitor.dataSources.queryIndexMappingsByType + if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { + mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> + newProps[iter.key] = iter.value + } + } + } + + return if (conflictingFields.contains(fullPath)) { + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${concreteIndexName}_$monitorId" + } + Triple(fieldName, "${fieldName}_${concreteIndexName}_$monitorId", newProps) + } else { + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${updatedIndexName}_$monitorId" } + Triple(fieldName, "${fieldName}_${updatedIndexName}_$monitorId", newProps) } } - if (props.containsKey("path")) { - newProps["path"] = "${props["path"]}_${indexName}_$monitorId" + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableMapOf>() + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) + flattenPaths.keys.forEach { allFlattenPaths.add(Pair(it, concreteIndexName)) } + // Updated mappings ready to be applied on queryIndex + properties.forEach { + if ( + it.value is Map<*, *> && + (it.value as Map).containsKey("type") && + (it.value as Map)["type"] == NESTED + ) { + } else { + if (updatedProperties.containsKey(it.key) && updatedProperties[it.key] != it.value) { + val mergedField = mergeConflictingFields( + updatedProperties[it.key] as Map, + it.value as Map + ) + updatedProperties[it.key] = mergedField + } else { + updatedProperties[it.key] = it.value + } } - return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps) } - // Traverse and update index mappings here while extracting flatten field paths - val flattenPaths = mutableListOf() - traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) - // Updated mappings ready to be applied on queryIndex - val updatedProperties = properties - // Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached. - var (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( - monitor, - monitorMetadata, - indexName, - updatedProperties - ) - - if (updateMappingResponse.isAcknowledged) { - doIndexAllQueries(concreteQueryIndex, indexName, monitorId, queries, flattenPaths, refreshPolicy, indexTimeout) + sourceIndexFieldLimit += checkMaxFieldLimit(concreteIndexName) } } } + // Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached. + val (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( + monitor, + monitorMetadata, + updatedIndexName, + sourceIndexFieldLimit, + updatedProperties + ) + + if (updateMappingResponse.isAcknowledged) { + doIndexAllQueries( + concreteQueryIndex, + updatedIndexName, + monitorId, + queries, + allFlattenPaths, + conflictingFields, + refreshPolicy, + indexTimeout + ) + } } } @@ -262,18 +323,60 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ sourceIndex: String, monitorId: String, queries: List, - flattenPaths: MutableList, + flattenPaths: MutableSet>, + conflictingPaths: Set, refreshPolicy: RefreshPolicy, indexTimeout: TimeValue ) { val indexRequests = mutableListOf() + val conflictingPathToConcreteIndices = mutableMapOf>() + flattenPaths.forEach { fieldPath -> + if (conflictingPaths.contains(fieldPath.first)) { + if (conflictingPathToConcreteIndices.containsKey(fieldPath.first)) { + val concreteIndexSet = conflictingPathToConcreteIndices[fieldPath.first] + concreteIndexSet!!.add(fieldPath.second) + conflictingPathToConcreteIndices[fieldPath.first] = concreteIndexSet + } else { + val concreteIndexSet = mutableSetOf() + concreteIndexSet.add(fieldPath.second) + conflictingPathToConcreteIndices[fieldPath.first] = concreteIndexSet + } + } + } + + val newQueries = mutableListOf() queries.forEach { + val filteredConcreteIndices = mutableSetOf() + var query = it.query + conflictingPaths.forEach { conflictingPath -> + if (query.contains(conflictingPath)) { + query = query.replace("$conflictingPath:", "${conflictingPath}__$monitorId:") + filteredConcreteIndices.addAll(conflictingPathToConcreteIndices[conflictingPath]!!) + } + } + + if (filteredConcreteIndices.isNotEmpty()) { + filteredConcreteIndices.forEach { filteredConcreteIndex -> + val newQuery = it.copy( + id = "${it.id}_$filteredConcreteIndex", + query = query.replace("", filteredConcreteIndex) + ) + newQueries.add(newQuery) + } + } else { + newQueries.add(it.copy(id = "${it.id}_$sourceIndex")) + } + } + + newQueries.forEach { var query = it.query flattenPaths.forEach { fieldPath -> - query = query.replace("$fieldPath:", "${fieldPath}_${sourceIndex}_$monitorId:") + if (!conflictingPaths.contains(fieldPath.first)) { + query = query.replace("${fieldPath.first}:", "${fieldPath.first}_${sourceIndex}_$monitorId:") + } } val indexRequest = IndexRequest(concreteQueryIndex) - .id(it.id + "_${sourceIndex}_$monitorId") + .id(it.id + "_$monitorId") .source( mapOf( "query" to mapOf("query_string" to mapOf("query" to query)), @@ -303,6 +406,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ monitor: Monitor, monitorMetadata: MonitorMetadata, sourceIndex: String, + sourceIndexFieldLimit: Long, updatedProperties: MutableMap ): Pair { var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] @@ -325,7 +429,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ var updateMappingResponse = AcknowledgedResponse(false) try { // Adjust max field limit in mappings for query index, if needed. - checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex) + adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit, targetQueryIndex) updateMappingResponse = client.suspendUntil { client.admin().indices().putMapping(updateMappingRequest, it) } @@ -339,7 +443,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // Do queryIndex rollover targetQueryIndex = rolloverQueryIndex(monitor) // Adjust max field limit in mappings for new index. - checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex) + adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit, targetQueryIndex) // PUT mappings to newly created index val updateMappingRequest = PutMappingRequest(targetQueryIndex) updateMappingRequest.source(mapOf("properties" to updatedProperties)) @@ -379,26 +483,96 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return Pair(updateMappingResponse, targetQueryIndex) } + /** + * merge conflicting leaf fields in the mapping tree + */ + private fun mergeConflictingFields(oldField: Map, newField: Map): Map { + val mergedField = mutableMapOf() + oldField.entries.forEach { + if (newField.containsKey(it.key)) { + if (it.value is Map<*, *> && newField[it.key] is Map<*, *>) { + mergedField[it.key] = + mergeConflictingFields(it.value as Map, newField[it.key] as Map) + } else { + mergedField[it.key] = it.value + } + } else { + mergedField[it.key] = it.value + } + } + + newField.entries.forEach { + if (!oldField.containsKey(it.key)) { + mergedField[it.key] = it.value + } + } + return mergedField + } + + /** + * get all fields which have same name but different mappings belonging to an index pattern + */ + fun getAllConflictingFields(clusterState: ClusterState, concreteIndices: List): Set { + val conflictingFields = mutableSetOf() + val allFlattenPaths = mutableMapOf>() + concreteIndices.forEach { concreteIndexName -> + if (clusterState.routingTable.hasIndex(concreteIndexName)) { + val indexMetadata = clusterState.metadata.index(concreteIndexName) + if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { + val properties = ( + (indexMetadata.mapping()?.sourceAsMap?.get("properties")) + as MutableMap + ) + // Node processor function is used to process leaves of index mappings tree + // + val leafNodeProcessor = + fun(fieldName: String, _: String, props: MutableMap): Triple> { + return Triple(fieldName, fieldName, props) + } + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableMapOf>() + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) + + flattenPaths.forEach { + if (allFlattenPaths.containsKey(it.key) && allFlattenPaths[it.key]!! != it.value) { + conflictingFields.add(it.key) + } + allFlattenPaths.putIfAbsent(it.key, it.value) + } + } + } + } + return conflictingFields + } + + /** + * checks the max field limit for a concrete index + */ + private suspend fun checkMaxFieldLimit(sourceIndex: String): Long { + val getSettingsResponse: GetSettingsResponse = client.suspendUntil { + admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex), it) + } + return getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L + } + /** * Adjusts max field limit index setting for query index if source index has higher limit. * This will prevent max field limit exception, when source index has more fields then query index limit */ - private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) { + private suspend fun adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit: Long, concreteQueryIndex: String) { val getSettingsResponse: GetSettingsResponse = client.suspendUntil { - admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex, concreteQueryIndex), it) + admin().indices().getSettings(GetSettingsRequest().indices(concreteQueryIndex), it) } - val sourceIndexLimit = - getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L val queryIndexLimit = getSettingsResponse.getSetting(concreteQueryIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L // Our query index initially has 3 fields we defined and 5 more builtin metadata fields in mappings so we have to account for that - if (sourceIndexLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) { + if (sourceIndexFieldLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) { val updateSettingsResponse: AcknowledgedResponse = client.suspendUntil { admin().indices().updateSettings( UpdateSettingsRequest(concreteQueryIndex).settings( Settings.builder().put( INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, - sourceIndexLimit + QUERY_INDEX_BASE_FIELDS_COUNT + sourceIndexFieldLimit + QUERY_INDEX_BASE_FIELDS_COUNT ) ), it diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 9edf19f08..125870c3c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.metadata.IndexAbstraction import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService @@ -21,7 +22,6 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.util.IndexUtils import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser -import org.opensearch.index.IndexNotFoundException class IndexUtils { @@ -150,11 +150,49 @@ class IndexUtils { result.addAll(concreteIndices) } - if (result.size == 0) { - throw IndexNotFoundException(indices[0]) + return result + } + + @JvmStatic + fun isDataStream(name: String, clusterState: ClusterState): Boolean { + return clusterState.metadata().dataStreams().containsKey(name) + } + + @JvmStatic + fun isAlias(name: String, clusterState: ClusterState): Boolean { + return clusterState.metadata().hasAlias(name) + } + + @JvmStatic + fun getWriteIndex(index: String, clusterState: ClusterState): String? { + if (isAlias(index, clusterState) || isDataStream(index, clusterState)) { + val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex + if (metadata != null) { + return metadata.index.name + } } + return null + } - return result + @JvmStatic + fun getNewestIndicesByCreationDate(concreteIndices: List, clusterState: ClusterState, thresholdDate: Long): List { + val filteredIndices = mutableListOf() + val lookup = clusterState.metadata().indicesLookup + concreteIndices.forEach { indexName -> + val index = lookup[indexName] + val indexMetadata = clusterState.metadata.index(indexName) + if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) { + if (indexMetadata.creationDate >= thresholdDate) { + filteredIndices.add(indexName) + } + } + } + return filteredIndices + } + + @JvmStatic + fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long { + return clusterState.metadata.index(index).creationDate } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt new file mode 100644 index 000000000..a22b09bdc --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -0,0 +1,392 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.workflow + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.BucketLevelMonitorRunner +import org.opensearch.alerting.DocumentLevelMonitorRunner +import org.opensearch.alerting.MonitorRunnerExecutionContext +import org.opensearch.alerting.QueryLevelMonitorRunner +import org.opensearch.alerting.WorkflowMetadataService +import org.opensearch.alerting.model.ChainedAlertTriggerRunResult +import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.WorkflowRunResult +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.isDocLevelMonitor +import org.opensearch.alerting.util.isQueryLevelMonitor +import org.opensearch.cluster.routing.Preference +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.ChainedAlertTrigger +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.isBucketLevelMonitor +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.query.QueryBuilders.boolQuery +import org.opensearch.index.query.QueryBuilders.existsQuery +import org.opensearch.index.query.QueryBuilders.termsQuery +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.util.UUID + +object CompositeWorkflowRunner : WorkflowRunner() { + + private val logger = LogManager.getLogger(javaClass) + + override suspend fun runWorkflow( + workflow: Workflow, + monitorCtx: MonitorRunnerExecutionContext, + periodStart: Instant, + periodEnd: Instant, + dryRun: Boolean, + ): WorkflowRunResult { + val workflowExecutionStartTime = Instant.now() + + val isTempWorkflow = dryRun || workflow.id == Workflow.NO_ID + + val executionId = generateExecutionId(isTempWorkflow, workflow) + + val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata( + workflow = workflow, + skipIndex = isTempWorkflow, + executionId = executionId + ) + var dataSources: DataSources? = null + logger.debug("Workflow ${workflow.id} in $executionId execution is running") + val delegates = (workflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } + var monitors: List + + try { + monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) + } catch (e: Exception) { + logger.error("Failed getting workflow delegates. Error: ${e.message}", e) + return WorkflowRunResult( + workflow.id, + workflow.name, + emptyList(), + workflowExecutionStartTime, + Instant.now(), + executionId, + AlertingException.wrap(e) + ) + } + // Validate the monitors size + validateMonitorSize(delegates, monitors, workflow) + val monitorsById = monitors.associateBy { it.id } + val resultList = mutableListOf>() + var lastErrorDelegateRun: Exception? = null + + for (delegate in delegates) { + var indexToDocIds = mapOf>() + var delegateMonitor: Monitor + delegateMonitor = monitorsById[delegate.monitorId] + ?: throw AlertingException.wrap( + IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id") + ) + if (delegate.chainedMonitorFindings != null) { + val chainedMonitorIds: MutableList = mutableListOf() + if (delegate.chainedMonitorFindings!!.monitorId.isNullOrBlank()) { + chainedMonitorIds.addAll(delegate.chainedMonitorFindings!!.monitorIds) + } else { + chainedMonitorIds.add(delegate.chainedMonitorFindings!!.monitorId!!) + } + val chainedMonitors = mutableListOf() + chainedMonitorIds.forEach { + val chainedMonitor = monitorsById[it] + ?: throw AlertingException.wrap( + IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id") + ) + chainedMonitors.add(chainedMonitor) + } + + try { + indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId) + } catch (e: Exception) { + logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e) + return WorkflowRunResult( + workflow.id, workflow.name, emptyList(), workflowExecutionStartTime, Instant.now(), executionId, + AlertingException.wrap(e) + ) + } + } + val workflowRunContext = WorkflowRunContext( + workflowId = workflowMetadata.workflowId, + workflowMetadataId = workflowMetadata.id, + chainedMonitorId = delegate.chainedMonitorFindings?.monitorId, + matchingDocIdsPerIndex = indexToDocIds, + auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true + else workflow.auditDelegateMonitorAlerts!! + ) + try { + dataSources = delegateMonitor.dataSources + val delegateRunResult = + runDelegateMonitor(delegateMonitor, monitorCtx, periodStart, periodEnd, dryRun, workflowRunContext, executionId) + resultList.add(delegateRunResult!!) + } catch (ex: Exception) { + logger.error("Error executing workflow delegate monitor ${delegate.monitorId}", ex) + lastErrorDelegateRun = AlertingException.wrap(ex) + break + } + } + logger.debug("Workflow ${workflow.id} delegate monitors in execution $executionId completed") + // Update metadata only if the workflow is not temp + if (!isTempWorkflow) { + WorkflowMetadataService.upsertWorkflowMetadata( + workflowMetadata.copy(latestRunTime = workflowExecutionStartTime, latestExecutionId = executionId), + true + ) + } + val triggerResults = mutableMapOf() + val workflowRunResult = WorkflowRunResult( + workflowId = workflow.id, + workflowName = workflow.name, + monitorRunResults = resultList, + executionStartTime = workflowExecutionStartTime, + executionEndTime = null, + executionId = executionId, + error = lastErrorDelegateRun, + triggerResults = triggerResults + ) + val currentAlerts = try { + monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources!!) + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(dataSources) + monitorCtx.alertService!!.loadCurrentAlertsForWorkflow(workflow, dataSources) + } catch (e: Exception) { + logger.error("Failed to fetch current alerts for workflow", e) + // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts + val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id + logger.error("Error loading alerts for workflow: $id", e) + return workflowRunResult.copy(error = e) + } + try { + monitorCtx.alertIndices!!.createOrUpdateAlertIndex(dataSources) + val updatedAlerts = mutableListOf() + val monitorIdToAlertIdsMap = fetchAlertsGeneratedInCurrentExecution(dataSources, executionId, monitorCtx, workflow) + for (trigger in workflow.triggers) { + val currentAlert = currentAlerts[trigger] + val caTrigger = trigger as ChainedAlertTrigger + val triggerCtx = ChainedAlertTriggerExecutionContext( + workflow = workflow, + workflowRunResult = workflowRunResult, + periodStart = workflowRunResult.executionStartTime, + periodEnd = workflowRunResult.executionEndTime, + trigger = caTrigger, + alertGeneratingMonitors = monitorIdToAlertIdsMap.keys, + monitorIdToAlertIdsMap = monitorIdToAlertIdsMap, + alert = currentAlert + ) + runChainedAlertTrigger( + monitorCtx, + workflow, + trigger, + executionId, + triggerCtx, + dryRun, + triggerResults, + updatedAlerts + ) + } + if (!dryRun && workflow.id != Workflow.NO_ID && updatedAlerts.isNotEmpty()) { + monitorCtx.retryPolicy?.let { + monitorCtx.alertService!!.saveAlerts( + dataSources, + updatedAlerts, + it, + routingId = workflow.id + ) + } + } + } catch (e: Exception) { + // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts + val id = if (workflow.id.trim().isEmpty()) "_na_" else workflow.id + logger.error("Error loading current chained alerts for workflow: $id", e) + return WorkflowRunResult( + workflowId = workflow.id, + workflowName = workflow.name, + monitorRunResults = emptyList(), + executionStartTime = workflowExecutionStartTime, + executionEndTime = Instant.now(), + executionId = executionId, + error = AlertingException.wrap(e), + triggerResults = emptyMap() + ) + } + workflowRunResult.executionEndTime = Instant.now() + return workflowRunResult + } + + private suspend fun runDelegateMonitor( + delegateMonitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + periodStart: Instant, + periodEnd: Instant, + dryRun: Boolean, + workflowRunContext: WorkflowRunContext, + executionId: String, + ): MonitorRunResult<*>? { + + if (delegateMonitor.isBucketLevelMonitor()) { + return BucketLevelMonitorRunner.runMonitor( + delegateMonitor, + monitorCtx, + periodStart, + periodEnd, + dryRun, + workflowRunContext, + executionId + ) + } else if (delegateMonitor.isDocLevelMonitor()) { + return DocumentLevelMonitorRunner().runMonitor( + delegateMonitor, + monitorCtx, + periodStart, + periodEnd, + dryRun, + workflowRunContext, + executionId + ) + } else if (delegateMonitor.isQueryLevelMonitor()) { + return QueryLevelMonitorRunner.runMonitor( + delegateMonitor, + monitorCtx, + periodStart, + periodEnd, + dryRun, + workflowRunContext, + executionId + ) + } else { + throw AlertingException.wrap( + IllegalStateException("Unsupported monitor type ${delegateMonitor.monitorType}") + ) + } + } + + fun generateExecutionId( + isTempWorkflow: Boolean, + workflow: Workflow, + ): String { + val randomPart = "_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}" + return if (isTempWorkflow) randomPart else workflow.id.plus(randomPart) + } + + private fun validateMonitorSize( + delegates: List, + monitors: List, + workflow: Workflow, + ) { + if (delegates.size != monitors.size) { + val diffMonitorIds = delegates.map { it.monitorId }.minus(monitors.map { it.id }.toSet()).joinToString() + logger.error("Delegate monitors don't exist $diffMonitorIds for the workflow $workflow.id") + throw AlertingException.wrap( + IllegalStateException("Delegate monitors don't exist $diffMonitorIds for the workflow $workflow.id") + ) + } + } + + private suspend fun runChainedAlertTrigger( + monitorCtx: MonitorRunnerExecutionContext, + workflow: Workflow, + trigger: ChainedAlertTrigger, + executionId: String, + triggerCtx: ChainedAlertTriggerExecutionContext, + dryRun: Boolean, + triggerResults: MutableMap, + updatedAlerts: MutableList, + ) { + val triggerRunResult = monitorCtx.triggerService!!.runChainedAlertTrigger( + workflow, trigger, triggerCtx.alertGeneratingMonitors, triggerCtx.monitorIdToAlertIdsMap + ) + triggerResults[trigger.id] = triggerRunResult + if (monitorCtx.triggerService!!.isChainedAlertTriggerActionable(triggerCtx, triggerRunResult)) { + val actionCtx = triggerCtx + for (action in trigger.actions) { + triggerRunResult.actionResults[action.id] = this.runAction(action, actionCtx, monitorCtx, workflow, dryRun) + } + } + val alert = monitorCtx.alertService!!.composeChainedAlert( + triggerCtx, executionId, workflow, triggerRunResult.associatedAlertIds.toList(), triggerRunResult + ) + if (alert != null) { + updatedAlerts.add(alert) + } + } + + private suspend fun fetchAlertsGeneratedInCurrentExecution( + dataSources: DataSources, + executionId: String, + monitorCtx: MonitorRunnerExecutionContext, + workflow: Workflow, + ): MutableMap> { + try { + val searchRequest = + SearchRequest(getDelegateMonitorAlertIndex(dataSources, workflow, monitorCtx.alertIndices!!.isAlertHistoryEnabled())) + searchRequest.preference(Preference.PRIMARY_FIRST.type()) + val queryBuilder = boolQuery() + queryBuilder.must(QueryBuilders.termQuery("execution_id", executionId)) + queryBuilder.must(QueryBuilders.termQuery("state", getDelegateMonitorAlertState(workflow).name)) + val noErrorQuery = boolQuery() + .should(boolQuery().mustNot(existsQuery(Alert.ERROR_MESSAGE_FIELD))) + .should(termsQuery(Alert.ERROR_MESSAGE_FIELD, "")) + queryBuilder.must(noErrorQuery) + searchRequest.source().query(queryBuilder).size(9999) + val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(searchRequest, it) } + val alerts = searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + monitorCtx.xContentRegistry, LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val alert = Alert.parse(xcp, hit.id, hit.version) + alert + } + val map = mutableMapOf>() + for (alert in alerts) { + if (map.containsKey(alert.monitorId)) { + map[alert.monitorId]!!.add(alert.id) + } else { + map[alert.monitorId] = mutableSetOf(alert.id) + } + } + return map + } catch (e: Exception) { + logger.error("failed to get alerts generated by delegate monitors in current execution $executionId", e) + return mutableMapOf() + } + } + + fun getDelegateMonitorAlertIndex( + dataSources: DataSources, + workflow: Workflow, + isAlertHistoryEnabled: Boolean, + ): String { + return if (workflow.triggers.isNotEmpty()) { + if (isAlertHistoryEnabled) { + dataSources.alertsHistoryIndex!! + } else dataSources.alertsIndex + } else dataSources.alertsIndex + } + + fun getDelegateMonitorAlertState( + workflow: Workflow, + ): Alert.State { + return if (workflow.triggers.isNotEmpty()) { + Alert.State.AUDIT + } else Alert.State.ACTIVE + } +} diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index 421dc202c..1bfea4ebc 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -1,7 +1,7 @@ { "dynamic": "strict", "_meta" : { - "schema_version": 2 + "schema_version": 4 }, "properties": { "schema_version": { @@ -46,6 +46,12 @@ "type" : "keyword" } } + }, + "fields": { + "type": "text" + }, + "query_field_names": { + "type": "keyword" } } }, @@ -60,6 +66,9 @@ "type" : "keyword" } } + }, + "execution_id": { + "type": "keyword" } } } \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 07c115b95..7c025a6c0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -74,6 +74,8 @@ import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL +import kotlin.collections.ArrayList +import kotlin.collections.HashMap /** * Superclass for tests that interact with an external test cluster using OpenSearch's RestClient @@ -774,7 +776,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response { val requestBody = StringEntity(doc, APPLICATION_JSON) val params = if (refresh) mapOf("refresh" to "true") else mapOf() - val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody) + val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody) assertTrue( "Unable to index doc: '${doc.take(15)}...' to index: '$index'", listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus()) @@ -810,6 +812,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return index } + protected fun createTestIndex(index: String, mapping: String?, alias: String): String { + createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias) + return index + } + protected fun createTestConfigIndex(index: String = "." + randomAlphaOfLength(10).lowercase(Locale.ROOT)): String { try { createIndex( @@ -846,7 +853,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val indicesMap = mutableMapOf() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { - val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "") + val indexName = createTestIndex(index = it, mapping = "") val isWriteIndex = indices.getOrDefault(indexName, false) indicesMap[indexName] = isWriteIndex val indexMap = mapOf( @@ -863,17 +870,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return mutableMapOf(alias to indicesMap) } + protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) { + val indexPattern = "$datastream*" + var componentTemplateMappings = "\"properties\": {" + + " \"netflow.destination_transport_port\":{ \"type\": \"long\" }," + + " \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" + + "}" + if (mappings != null) { + componentTemplateMappings = mappings + } + if (useComponentTemplate) { + // Setup index_template + createComponentTemplateWithMappings( + "my_ds_component_template-$datastream", + componentTemplateMappings + ) + } + createComposableIndexTemplate( + "my_index_template_ds-$datastream", + listOf(indexPattern), + (if (useComponentTemplate) "my_ds_component_template-$datastream" else null), + mappings, + true, + 0 + ) + createDataStream(datastream) + } + + protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) { + client().makeRequest("PUT", "_data_stream/$datastream") + } + + protected fun deleteDataStream(datastream: String) { + client().makeRequest("DELETE", "_data_stream/$datastream") + } + + protected fun createIndexAlias(alias: String, mappings: String?) { + val indexPattern = "$alias*" + var componentTemplateMappings = "\"properties\": {" + + " \"netflow.destination_transport_port\":{ \"type\": \"long\" }," + + " \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" + + "}" + if (mappings != null) { + componentTemplateMappings = mappings + } + createComponentTemplateWithMappings( + "my_alias_component_template-$alias", + componentTemplateMappings + ) + createComposableIndexTemplate( + "my_index_template_alias-$alias", + listOf(indexPattern), + "my_alias_component_template-$alias", + mappings, + false, + 0 + ) + createTestIndex( + "$alias-000001", + null, + """ + "$alias": { + "is_write_index": true + } + """.trimIndent() + ) + } + + protected fun deleteIndexAlias(alias: String) { + client().makeRequest("DELETE", "$alias*/_alias/$alias") + } + + protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) { + val body = """{"template" : { "mappings": {$mappings} }}""" + client().makeRequest( + "PUT", + "_component_template/$componentTemplateName", + emptyMap(), + StringEntity(body, ContentType.APPLICATION_JSON), + BasicHeader("Content-Type", "application/json") + ) + } + + protected fun createComposableIndexTemplate( + templateName: String, + indexPatterns: List, + componentTemplateName: String?, + mappings: String?, + isDataStream: Boolean, + priority: Int + ) { + var body = "{\n" + if (isDataStream) { + body += "\"data_stream\": { }," + } + body += "\"index_patterns\": [" + + indexPatterns.stream().collect( + Collectors.joining(",", "\"", "\"") + ) + "]," + if (componentTemplateName == null) { + body += "\"template\": {\"mappings\": {$mappings}}," + } + if (componentTemplateName != null) { + body += "\"composed_of\": [\"$componentTemplateName\"]," + } + body += "\"priority\":$priority}" + client().makeRequest( + "PUT", + "_index_template/$templateName", + emptyMap(), + StringEntity(body, APPLICATION_JSON), + BasicHeader("Content-Type", "application/json") + ) + } + + protected fun getDatastreamWriteIndex(datastream: String): String { + val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null) + var respAsMap = responseAsMap(response) + if (respAsMap.containsKey("data_streams")) { + respAsMap = (respAsMap["data_streams"] as ArrayList>)[0] + val indices = respAsMap["indices"] as List> + val index = indices.last() + return index["index_name"] as String + } else { + respAsMap = respAsMap[datastream] as Map + } + val indices = respAsMap["indices"] as Array + return indices.last() + } + + protected fun rolloverDatastream(datastream: String) { + client().makeRequest( + "POST", + datastream + "/_rollover", + emptyMap(), + null + ) + } + protected fun randomAliasIndices( alias: String, num: Int = randomIntBetween(1, 10), includeWriteIndex: Boolean = true ): Map { val indices = mutableMapOf() - val writeIndex = randomIntBetween(0, num) + val writeIndex = randomIntBetween(0, num - 1) for (i: Int in 0 until num) { - var indexName = randomAlphaOfLength(10) + var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT) while (indexName.equals(alias) || indices.containsKey(indexName)) - indexName = randomAlphaOfLength(10) + indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT) indices[indexName] = includeWriteIndex && i == writeIndex } return indices diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 51027a6b1..d62d563c7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -5,10 +5,15 @@ package org.opensearch.alerting +import org.apache.hc.core5.http.ContentType +import org.apache.hc.core5.http.io.entity.StringEntity +import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.client.Response import org.opensearch.client.ResponseException +import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput @@ -17,6 +22,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.model.action.PerExecutionActionScope +import org.opensearch.core.rest.RestStatus import org.opensearch.script.Script import java.time.ZonedDateTime import java.time.format.DateTimeFormatter @@ -36,7 +42,153 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val index = createTestIndex() - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test dryrun execute monitor with queryFieldNames set up with correct field`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = + DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf(), queryFieldNames = listOf("test_field")) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + // using wrong field name + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + fields = listOf(), + queryFieldNames = listOf("wrong_field") + ) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(0, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test fetch_query_field_names setting is disabled by configuring queryFieldNames set up with wrong field still works`() { + adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.key, "false") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + // using wrong field name + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + fields = listOf(), + queryFieldNames = listOf("wrong_field") + ) val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) @@ -79,7 +231,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) @@ -111,7 +263,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) @@ -151,7 +303,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", tags = listOf("test_tag")) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", tags = listOf("test_tag"), fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = Script("query[tag=test_tag]")) @@ -191,7 +343,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", tags = listOf("test_tag")) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", tags = listOf("test_tag"), fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) @@ -222,7 +374,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val alertCategories = AlertCategory.values() @@ -290,7 +442,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val actionExecutionScope = PerExecutionActionScope() @@ -357,7 +509,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]")) @@ -388,6 +540,54 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size) } + fun `test execute monitor for bulk index findings`() { + val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" + val testQueryName = "wildcard-test-query" + val testIndex = createTestIndex("${testIndexPrefix}1") + val testIndex2 = createTestIndex("${testIndexPrefix}2") + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]")) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + for (i in 0 until 9) { + indexDoc(testIndex, i.toString(), testDoc) + } + indexDoc(testIndex2, "3", testDoc) + adminClient().updateSettings("plugins.alerting.alert_findings_indexing_batch_size", 2) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Correct search result", 10, matchingDocsToQuery.size) + assertTrue("Correct search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "2|$testIndex", "3|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 10, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 10, findings.size) + val foundFindings = + findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") } + assertEquals("Found findings for all docs", 4, foundFindings.size) + } + fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS query operator`() { val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" val testQueryName = "wildcard-test-query" @@ -401,7 +601,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "NOT (test_field:\"us-west-1\")", name = testQueryName) + val docQuery = DocLevelQuery(query = "NOT (test_field:\"us-west-1\")", name = testQueryName, fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]")) @@ -442,7 +642,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) @@ -495,86 +695,620 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size) } - fun `test document-level monitor when alias only has write index with 0 docs`() { - // Monitor should execute, but create 0 findings. - val alias = createTestAlias(includeWriteIndex = true) - val aliasIndex = alias.keys.first() - val query = randomDocLevelQuery(tags = listOf()) - val input = randomDocLevelMonitorInput(indices = listOf(aliasIndex), queries = listOf(query)) - val trigger = randomDocumentLevelTrigger(condition = Script("query[id=\"${query.id}\"]")) - val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + fun `test execute monitor with indices having fields with same name but different data types`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source.device.port": { "type": "long" }, + "source.device.hwd.id": { "type": "long" }, + "nested_field": { + "type": "nested", + "properties": { + "test1": { + "type": "keyword" + } + } + }, + "my_join_field": { + "type": "join", + "relations": { + "question": "answer" + } + }, + "test_field" : { "type" : "integer" } + } + """.trimIndent() + ) + var testDoc = """{ + "source" : { "device": {"port" : 12345 } }, + "nested_field": { "test1": "some text" }, + "test_field": 12345 + }""" - val response: Response - try { - response = executeMonitor(monitor.id) - } catch (e: ResponseException) { - assertNotNull("Expected an error message: $e", e.message) - e.message?.let { - assertTrue("Unexpected exception: $e", it.contains("""reason":"no such index [.opensearch-alerting-findings]""")) - } - assertEquals(404, e.response.statusLine.statusCode) - return - } + val docQuery1 = DocLevelQuery( + query = "(source.device.port:12345 AND test_field:12345) OR source.device.hwd.id:12345", + name = "4", + fields = listOf() + ) + val docQuery2 = DocLevelQuery( + query = "(source.device.port:\"12345\" AND test_field:\"12345\") OR source.device.hwd.id:\"12345\"", + name = "5", + fields = listOf() + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) - val output = entityAsMap(response) - val inputResults = output.stringMap("input_results") - val errorMessage = inputResults?.get("error") - @Suppress("UNCHECKED_CAST") - val searchResult = (inputResults?.get("results") as List>).firstOrNull() - @Suppress("UNCHECKED_CAST") - val findings = searchFindings() + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) - assertEquals(monitor.name, output["monitor_name"]) - assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.findings.forEach { - val queryIds = it.finding.docLevelQueries.map { query -> query.id } - assertFalse("No findings should exist with queryId ${query.id}, but found: $it", queryIds.contains(query.id)) - } - } + indexDoc(testIndex, "1", testDoc) + executeMonitor(monitor.id) - fun `test document-level monitor when docs exist prior to monitor creation`() { - // FIXME: Consider renaming this test case - // Only new docs should create findings. - val alias = createTestAlias(includeWriteIndex = true) - val aliasIndex = alias.keys.first() - val indices = alias[aliasIndex]?.keys?.toList() as List - val query = randomDocLevelQuery(tags = listOf()) - val input = randomDocLevelMonitorInput(indices = listOf(aliasIndex), queries = listOf(query)) - val trigger = randomDocumentLevelTrigger(condition = Script("query[id=\"${query.id}\"]")) + var alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 1, alerts.size) - val preExistingDocIds = mutableSetOf() - indices.forEach { index -> - val docId = index.hashCode().toString() - val doc = """{ "message" : "${query.query}" }""" - preExistingDocIds.add(docId) - indexDoc(index = index, id = docId, doc = doc) - } - assertEquals(indices.size, preExistingDocIds.size) + var findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) - val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + // clear previous findings and alerts + deleteIndex(ALL_FINDING_INDEX_PATTERN) + deleteIndex(ALL_ALERT_INDEX_PATTERN) - val response = executeMonitor(monitor.id) + indexDoc(testIndex, "2", testDoc) - val output = entityAsMap(response) - val inputResults = output.stringMap("input_results") - val errorMessage = inputResults?.get("error") - @Suppress("UNCHECKED_CAST") - val searchResult = (inputResults?.get("results") as List>).firstOrNull() - @Suppress("UNCHECKED_CAST") - val findings = searchFindings() + // no fields expanded as only index test1 is present + val oldExpectedQueries = listOf( + "(source.device.port_test__${monitor.id}:12345 AND test_field_test__${monitor.id}:12345) OR " + + "source.device.hwd.id_test__${monitor.id}:12345", + "(source.device.port_test__${monitor.id}:\"12345\" AND test_field_test__${monitor.id}:\"12345\") " + + "OR source.device.hwd.id_test__${monitor.id}:\"12345\"" + ) - assertEquals(monitor.name, output["monitor_name"]) - assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) - findings.findings.forEach { - val docIds = it.finding.relatedDocIds - assertTrue( - "Findings index should not contain a pre-existing doc, but found $it", - preExistingDocIds.intersect(docIds).isEmpty() - ) + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(oldExpectedQueries.contains(query)) } - } - fun `test document-level monitor when alias indices only contain docs that match query`() { + val testIndex2 = createTestIndex( + "test2", + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + ) + testDoc = """{ + "source" : { "device": {"port" : "12345" } }, + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + indexDoc(testIndex2, "1", testDoc) + executeMonitor(monitor.id) + + // only fields source.device.port & test_field is expanded as they have same name but different data types + // in indices test1 & test2 + val newExpectedQueries = listOf( + "(source.device.port_test2_${monitor.id}:12345 AND test_field_test2_${monitor.id}:12345) " + + "OR source.device.hwd.id_test__${monitor.id}:12345", + "(source.device.port_test1_${monitor.id}:12345 AND test_field_test1_${monitor.id}:12345) " + + "OR source.device.hwd.id_test__${monitor.id}:12345", + "(source.device.port_test2_${monitor.id}:\"12345\" AND test_field_test2_${monitor.id}:\"12345\") " + + "OR source.device.hwd.id_test__${monitor.id}:\"12345\"", + "(source.device.port_test1_${monitor.id}:\"12345\" AND test_field_test1_${monitor.id}:\"12345\") " + + "OR source.device.hwd.id_test__${monitor.id}:\"12345\"" + ) + + alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(oldExpectedQueries.contains(query) || newExpectedQueries.contains(query)) + } + } + + fun `test execute monitor with indices having fields with same name but with different nesting`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "nested_field": { + "type": "nested", + "properties": { + "test1": { + "type": "keyword" + } + } + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "nested_field": { + "properties": { + "test1": { + "type": "keyword" + } + } + } + } + """.trimIndent() + ) + val testDoc = """{ + "nested_field": { "test1": "12345" } + }""" + + val docQuery = DocLevelQuery( + query = "nested_field.test1:\"12345\"", + name = "5", + fields = listOf() + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "1", testDoc) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + // as mappings of source.id & test_field are different so, both of them expands + val expectedQueries = listOf( + "nested_field.test1_test__${monitor.id}:\"12345\"" + ) + + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(expectedQueries.contains(query)) + } + } + + fun `test execute monitor with indices having fields with same name but different field mappings`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source": { + "properties": { + "id": { + "type":"text", + "analyzer":"whitespace" + } + } + }, + "test_field" : { + "type":"text", + "analyzer":"whitespace" + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "source": { + "properties": { + "id": { + "type":"text" + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + val testDoc = """{ + "source" : {"id" : "12345" }, + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + + val docQuery = DocLevelQuery( + query = "test_field:\"12345\" AND source.id:\"12345\"", + name = "5", + fields = listOf() + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "1", testDoc) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + // as mappings of source.id & test_field are different so, both of them expands + val expectedQueries = listOf( + "test_field_test2_${monitor.id}:\"12345\" AND source.id_test2_${monitor.id}:\"12345\"", + "test_field_test1_${monitor.id}:\"12345\" AND source.id_test1_${monitor.id}:\"12345\"" + ) + + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(expectedQueries.contains(query)) + } + } + + fun `test execute monitor with indices having fields with same name but different field mappings in multiple indices`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source": { + "properties": { + "device": { + "properties": { + "hwd": { + "properties": { + "id": { + "type":"text", + "analyzer":"whitespace" + } + } + } + } + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "test_field" : { + "type":"keyword" + } + } + """.trimIndent() + ) + + val testIndex4 = createTestIndex( + "test4", + """"properties": { + "source": { + "properties": { + "device": { + "properties": { + "hwd": { + "properties": { + "id": { + "type":"text" + } + } + } + } + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + + val testDoc1 = """{ + "source" : {"device" : {"hwd" : {"id" : "12345"}} }, + "nested_field": { "test1": "some text" } + }""" + val testDoc2 = """{ + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + + val docQuery1 = DocLevelQuery( + query = "test_field:\"12345\"", + name = "4", + fields = listOf() + ) + val docQuery2 = DocLevelQuery( + query = "source.device.hwd.id:\"12345\"", + name = "5", + fields = listOf() + ) + + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex4, "1", testDoc1) + indexDoc(testIndex2, "1", testDoc2) + indexDoc(testIndex, "1", testDoc1) + indexDoc(testIndex, "2", testDoc2) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 4, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 4, findings.size) + + val request = """{ + "size": 0, + "query": { + "match_all": {} + } + }""" + val httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(5L, it.value) } + } + + fun `test no of queries generated for document-level monitor based on wildcard indexes`() { + val testIndex = createTestIndex("test1") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + executeMonitor(monitor.id) + + val request = """{ + "size": 0, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(1L, it.value) } + + val testIndex2 = createTestIndex("test2") + indexDoc(testIndex2, "1", testDoc) + executeMonitor(monitor.id) + + httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(1L, it.value) } + } + + fun `test execute monitor with new index added after first execution that generates alerts and findings from new query`() { + val testIndex = createTestIndex("test1") + val testIndex2 = createTestIndex("test2") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery1 = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docQuery2 = DocLevelQuery(query = "test_field_new:\"us-west-2\"", name = "4", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "5", testDoc) + executeMonitor(monitor.id) + + var alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + var findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") } + assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size) + + // clear previous findings and alerts + deleteIndex(ALL_FINDING_INDEX_PATTERN) + deleteIndex(ALL_ALERT_INDEX_PATTERN) + + val testDocNew = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field_new" : "us-west-2" + }""" + + val testIndex3 = createTestIndex("test3") + indexDoc(testIndex3, "10", testDocNew) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery2.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("10|$testIndex3"))) + + alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 1, alerts.size) + + findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) + + foundFindings = findings.filter { + it.relatedDocIds.contains("10") + } + assertEquals("Findings saved for test monitor expected 10", 1, foundFindings.size) + } + + fun `test document-level monitor when alias only has write index with 0 docs`() { + // Monitor should execute, but create 0 findings. + val alias = createTestAlias(includeWriteIndex = true) + val aliasIndex = alias.keys.first() + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = listOf(aliasIndex), queries = listOf(query)) + val trigger = randomDocumentLevelTrigger(condition = Script("query[id=\"${query.id}\"]")) + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + + val response: Response + try { + response = executeMonitor(monitor.id) + } catch (e: ResponseException) { + assertNotNull("Expected an error message: $e", e.message) + e.message?.let { + assertTrue("Unexpected exception: $e", it.contains("""reason":"no such index [.opensearch-alerting-findings]""")) + } + assertEquals(404, e.response.statusLine.statusCode) + return + } + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = searchFindings() + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.findings.forEach { + val queryIds = it.finding.docLevelQueries.map { query -> query.id } + assertFalse("No findings should exist with queryId ${query.id}, but found: $it", queryIds.contains(query.id)) + } + } + + fun `test document-level monitor when docs exist prior to monitor creation`() { + // FIXME: Consider renaming this test case + // Only new docs should create findings. + val alias = createTestAlias(includeWriteIndex = true) + val aliasIndex = alias.keys.first() + val indices = alias[aliasIndex]?.keys?.toList() as List + val query = randomDocLevelQuery(tags = listOf()) + val input = randomDocLevelMonitorInput(indices = listOf(aliasIndex), queries = listOf(query)) + val trigger = randomDocumentLevelTrigger(condition = Script("query[id=\"${query.id}\"]")) + + val preExistingDocIds = mutableSetOf() + indices.forEach { index -> + val docId = index.hashCode().toString() + val doc = """{ "message" : "${query.query}" }""" + preExistingDocIds.add(docId) + indexDoc(index = index, id = docId, doc = doc) + } + assertEquals(indices.size, preExistingDocIds.size) + + val monitor = createMonitor(randomDocumentLevelMonitor(enabled = false, inputs = listOf(input), triggers = listOf(trigger))) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val errorMessage = inputResults?.get("error") + @Suppress("UNCHECKED_CAST") + val searchResult = (inputResults?.get("results") as List>).firstOrNull() + @Suppress("UNCHECKED_CAST") + val findings = searchFindings() + + assertEquals(monitor.name, output["monitor_name"]) + assertNull("Unexpected monitor execution failure: $errorMessage", errorMessage) + findings.findings.forEach { + val docIds = it.finding.relatedDocIds + assertTrue( + "Findings index should not contain a pre-existing doc, but found $it", + preExistingDocIds.intersect(docIds).isEmpty() + ) + } + } + + fun `test document-level monitor when alias indices only contain docs that match query`() { // Only new docs should create findings. val alias = createTestAlias(includeWriteIndex = true) val aliasIndex = alias.keys.first() @@ -690,6 +1424,314 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } } + fun `test document-level monitor when datastreams contain docs that do match query`() { + val dataStreamName = "test-datastream" + createDataStream( + dataStreamName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "2", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + deleteDataStream(dataStreamName) + } + + fun `test document-level monitor when datastreams contain docs across read-only indices that do match query`() { + val dataStreamName = "test-datastream" + createDataStream( + dataStreamName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + indexDoc(dataStreamName, "2", testDoc) + rolloverDatastream(dataStreamName) + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "4", testDoc) + rolloverDatastream(dataStreamName) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + + indexDoc(dataStreamName, "5", testDoc) + indexDoc(dataStreamName, "6", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + deleteDataStream(dataStreamName) + } + + fun `test document-level monitor when index alias contain docs that do match query`() { + val aliasName = "test-alias" + createIndexAlias( + aliasName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("$aliasName"), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(aliasName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + rolloverDatastream(aliasName) + indexDoc(aliasName, "2", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + deleteIndexAlias(aliasName) + } + + fun `test document-level monitor when multiple datastreams contain docs across read-only indices that do match query`() { + val dataStreamName1 = "test-datastream1" + createDataStream( + dataStreamName1, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + val dataStreamName2 = "test-datastream2" + createDataStream( + dataStreamName2, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName2, "-1", testDoc) + rolloverDatastream(dataStreamName2) + indexDoc(dataStreamName2, "0", testDoc) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("test-datastream*"), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + indexDoc(dataStreamName1, "1", testDoc) + indexDoc(dataStreamName2, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + + indexDoc(dataStreamName1, "2", testDoc) + indexDoc(dataStreamName2, "2", testDoc) + rolloverDatastream(dataStreamName1) + rolloverDatastream(dataStreamName1) + rolloverDatastream(dataStreamName2) + indexDoc(dataStreamName1, "4", testDoc) + indexDoc(dataStreamName2, "4", testDoc) + rolloverDatastream(dataStreamName1) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 4, matchingDocsToQuery.size) + + indexDoc(dataStreamName1, "5", testDoc) + indexDoc(dataStreamName1, "6", testDoc) + indexDoc(dataStreamName2, "5", testDoc) + indexDoc(dataStreamName2, "6", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 4, matchingDocsToQuery.size) + deleteDataStream(dataStreamName1) + deleteDataStream(dataStreamName2) + } + + fun `test document-level monitor ignoring old read-only indices for datastreams`() { + val dataStreamName = "test-datastream" + createDataStream( + dataStreamName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName, "-1", testDoc) + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "0", testDoc) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + indexDoc(dataStreamName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "2", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + deleteDataStream(dataStreamName) + } + fun `test execute monitor with non-null data sources`() { val testIndex = createTestIndex() @@ -700,7 +1742,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { "test_field" : "us-west-2" }""" - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val alertCategories = AlertCategory.values() @@ -766,18 +1808,38 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) } - val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions) - try { - createMonitor( - randomDocumentLevelMonitor( - inputs = listOf(docLevelInput), - triggers = listOf(trigger), - owner = "owner" - ) + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) ) - fail("Expected create monitor to fail") - } catch (e: ResponseException) { - assertTrue(e.message!!.contains("illegal_argument_exception")) - } + ) + + indexDoc(index1, "1", testDoc) + indexDoc(index2, "1", testDoc) + indexDoc(index4, "1", testDoc) + indexDoc(index5, "1", testDoc) + + var response = executeMonitor(monitor.id) + + var output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + deleteIndex(index1) + deleteIndex(index2) + + indexDoc(index4, "2", testDoc) + response = executeMonitor(monitor.id) + + output = entityAsMap(response) + assertEquals(1, output.objectMap("trigger_results").values.size) + } + + @Suppress("UNCHECKED_CAST") + /** helper that returns a field in a json map whose values are all json objects */ + private fun Map.objectMap(key: String): Map> { + return this[key] as Map> } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index f360cdd8d..7f577f717 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -276,6 +276,263 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Findings saved for test monitor", 4, findings.size) } + fun `test execute monitor without triggers`() { + val docQuery = DocLevelQuery(query = "eventType:\"login\"", name = "3", fields = listOf()) + + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + + val testDoc = """{ + "eventType" : "login" + }""" + indexDoc(index, "1", testDoc) + + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + // Execute dry run first and expect no alerts or findings + var executeMonitorResponse = executeMonitor(monitor, id, true) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 0) + searchAlerts(id) + var table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.isEmpty()) + var findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 0, findings.size) + + // Execute real run - expect findings, but no alerts + executeMonitorResponse = executeMonitor(monitor, id, false) + + searchAlerts(id) + table = Table("asc", "id", null, 1, 0, "") + getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.isEmpty()) + + findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match query", 1, findings[0].docLevelQueries.size) + } + + fun `test all fields fetched and submitted to percolate query when one of the queries doesn't have queryFieldNames`() { + // doesn't have query field names so even if other queries pass the wrong fields to query, findings will get generated on matching docs + val docQuery1 = DocLevelQuery( + query = "source.ip.v6.v1:12345", + name = "3", + fields = listOf() + ) + val docQuery2 = DocLevelQuery( + query = "source.ip.v6.v2:16645", + name = "4", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery3 = DocLevelQuery( + query = "source.ip.v4.v0:120", + name = "5", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery4 = + DocLevelQuery( + query = "alias.some.fff:\"us-west-2\"", + name = "6", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery5 = DocLevelQuery( + query = "message:\"This is an error from IAD region\"", + name = "7", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1"), + fields = listOf() + ) + val docQuery6 = + DocLevelQuery( + query = "type.subtype:\"some subtype\"", + name = "8", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery7 = + DocLevelQuery( + query = "supertype.type:\"some type\"", + name = "9", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5, docQuery6, docQuery7) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters + val testDoc = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", + "test_strict_date_time" : "$testTime", + "test_field.some_other_field" : "us-west-2", + "type.subtype" : "some subtype", + "supertype.type" : "some type" + }""" + indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match all 7 queries", 7, findings[0].docLevelQueries.size) + } + + fun `test percolate query failure when queryFieldNames has alias`() { + // doesn't have query field names so even if other queries pass the wrong fields to query, findings will get generated on matching docs + val docQuery1 = DocLevelQuery( + query = "source.ip.v6.v1:12345", + name = "3", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery2 = DocLevelQuery( + query = "source.ip.v6.v2:16645", + name = "4", + fields = listOf(), + queryFieldNames = listOf("source.ip.v6.v2") + ) + val docQuery3 = DocLevelQuery( + query = "source.ip.v4.v0:120", + name = "5", + fields = listOf(), + queryFieldNames = listOf("source.ip.v6.v4") + ) + val docQuery4 = + DocLevelQuery( + query = "alias.some.fff:\"us-west-2\"", + name = "6", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff") + ) + val docQuery5 = DocLevelQuery( + query = "message:\"This is an error from IAD region\"", + name = "7", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1"), + fields = listOf() + ) + val docQuery6 = + DocLevelQuery( + query = "type.subtype:\"some subtype\"", + name = "8", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery7 = + DocLevelQuery( + query = "supertype.type:\"some type\"", + name = "9", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5, docQuery6, docQuery7) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters + val testDoc = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", + "test_strict_date_time" : "$testTime", + "test_field.some_other_field" : "us-west-2", + "type.subtype" : "some subtype", + "supertype.type" : "some type" + }""" + indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 0) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + Assert.assertTrue(getAlertsResponse.alerts[0].state.toString().equals(Alert.State.ERROR.toString())) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 0, findings.size) + } + fun `test execute monitor with custom query index`() { val q1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val q2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")