From 586459e017c9093ca6135ac5c47ace034399f924 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 19 Feb 2024 02:11:03 -0800 Subject: [PATCH 1/3] add jvm aware and max docs settings for batching docs for percolate queries Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertingPlugin.kt | 4 + .../alerting/DocumentLevelMonitorRunner.kt | 321 +++++++++++++----- .../alerting/MonitorRunnerExecutionContext.kt | 2 + .../alerting/MonitorRunnerService.kt | 14 + .../alerting/settings/AlertingSettings.kt | 23 ++ 5 files changed, 285 insertions(+), 79 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index ea01b2524..0c09cbabf 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -100,6 +100,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.painless.spi.Allowlist import org.opensearch.painless.spi.AllowlistLoader import org.opensearch.painless.spi.PainlessExtension @@ -263,6 +264,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerTriggerService(TriggerService(scriptService)) .registerAlertService(AlertService(client, xContentRegistry, alertIndices)) .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) + .registerJvmStats(JvmStats.jvmStats()) .registerWorkflowService(WorkflowService(client, xContentRegistry)) .registerConsumers() .registerDestinationSettings() @@ -320,6 +322,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.ALERT_HISTORY_MAX_DOCS, AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, AlertingSettings.ALERTING_MAX_MONITORS, + AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY, AlertingSettings.REQUEST_TIMEOUT, AlertingSettings.MAX_ACTION_THROTTLE_VALUE, AlertingSettings.FILTER_BY_BACKEND_ROLES, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 0e88b5cb3..498602f34 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -25,6 +25,8 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -60,12 +62,15 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders import org.opensearch.percolator.PercolateQueryBuilderExt +import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID +import java.util.concurrent.atomic.AtomicLong +import java.util.stream.Collectors import kotlin.math.max object DocumentLevelMonitorRunner : MonitorRunner() { @@ -83,7 +88,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) - + var nonPercolateSearchesTimeTaken = AtomicLong(0) + var percolateQueriesTimeTaken = AtomicLong(0) + var totalDocsQueried = AtomicLong(0) + var docTransformTimeTaken = AtomicLong(0) try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) @@ -152,6 +160,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + /* Contains list of docs source that are held in memory to submit to percolate query against query index. + * Docs are fetched from the source index per shard and transformed.*/ + val transformedDocs = mutableListOf>() + val docsSizeInBytes = AtomicLong(0) + val concreteIndicesSeenSoFar = mutableListOf() + val updatedIndexNames = mutableListOf() + val queryingStartTimeMillis = System.currentTimeMillis() docLevelMonitorInput.indices.forEach { indexName -> var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), @@ -173,7 +188,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } } + concreteIndicesSeenSoFar.addAll(concreteIndices) val updatedIndexName = indexName.replace("*", "_") + updatedIndexNames.add(updatedIndexName) val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( monitorCtx.clusterService!!.state(), concreteIndices @@ -222,41 +239,43 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - val matchingDocs = getMatchingDocs( + fetchShardDataAndMaybeExecutePercolateQueries( monitor, monitorCtx, docExecutionContext, updatedIndexName, concreteIndexName, conflictingFields.toList(), - matchingDocIdsPerIndex?.get(concreteIndexName) + matchingDocIdsPerIndex?.get(concreteIndexName), + monitorMetadata, + inputRunResults, + docsToQueries, + transformedDocs, + docsSizeInBytes, + updatedIndexNames, + concreteIndicesSeenSoFar, + nonPercolateSearchesTimeTaken, + percolateQueriesTimeTaken, + totalDocsQueried, + docTransformTimeTaken ) - - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( - monitorCtx, - matchingDocs.map { it.second }, - monitor, - monitorMetadata, - updatedIndexName, - concreteIndexName - ) - - matchedQueriesForDocs.forEach { hit -> - val id = hit.id - .replace("_${updatedIndexName}_${monitor.id}", "") - .replace("_${concreteIndexName}_${monitor.id}", "") - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${matchingDocs[idx].first}|$concreteIndexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) - } - } - } } } + /* if all indices are covered still in-memory docs size limit is not breached we would need to submit + the percolate query at the end */ + if (transformedDocs.isNotEmpty()) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + updatedIndexNames, + concreteIndicesSeenSoFar, + inputRunResults, + docsToQueries + ) + } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -637,17 +656,32 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return allShards.filter { it.primary() }.size } - private suspend fun getMatchingDocs( + /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. + * needs to be converted to scroll if not performant enough) + * 2. Transform documents to conform to format required for percolate query + * 3a. Check if docs in memory are crossing threshold defined by setting. + * 3b. If yes, perform percolate query and update docToQueries Map with all hits from percolate queries */ + private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, - index: String, - concreteIndex: String, + indexName: String, + concreteIndexName: String, conflictingFields: List, - docIds: List? = null - ): List> { + docIds: List? = null, + monitorMetadata: MonitorMetadata, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + transformedDocs: MutableList>, + docsSizeInBytes: AtomicLong, + monitorInputIndices: List, + concreteIndices: List, + nonPercolateSearchesTimeTaken: AtomicLong, + percolateQueriesTimeTaken: AtomicLong, + totalDocsQueried: AtomicLong, + docTransformTimeTake: AtomicLong, + ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int - val matchingDocs = mutableListOf>() for (i: Int in 0 until count) { val shard = i.toString() try { @@ -656,24 +690,99 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val hits: SearchHits = searchShard( monitorCtx, - concreteIndex, + concreteIndexName, shard, prevSeqNo, maxSeqNo, null, docIds ) + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexName, + concreteIndexName, + monitor.id, + conflictingFields, + docsSizeInBytes + ) + ) + } catch (e: Exception) { + logger.error( + "Monitor ${monitor.id} :" + + " Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}", + e + ) + } + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + ) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries + ) + } + } + } + + private fun shouldPerformPercolateQueryAndFlushInMemoryDocs( + docsSizeInBytes: AtomicLong, + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) || + isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) + } + + private suspend fun performPercolateQueryAndResetCounters( + monitorCtx: MonitorRunnerExecutionContext, + transformedDocs: MutableList>, + docsSizeInBytes: AtomicLong, + monitor: Monitor, + monitorMetadata: MonitorMetadata, + monitorInputIndices: List, + concreteIndices: List, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + ) { + try { + val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( + monitorCtx, + transformedDocs, + monitor, + monitorMetadata, + concreteIndices, + monitorInputIndices + ) - if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) + percolateQueryResponseHits.forEach { hit -> + var id = hit.id + concreteIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + monitorInputIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${transformedDocs[idx].first}|${transformedDocs[idx].second.concreteIndexName}" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } - } catch (e: Exception) { - logger.error("Failed to run for shard $shard. Error: ${e.message}") } + } finally { // no catch block because exception is caught and handled in runMonitor() class + transformedDocs.clear() + docsSizeInBytes.set(0) } - return matchingDocs } + /** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo. + * This method hence fetches only docs from shard which haven't been queried before + */ private suspend fun searchShard( monitorCtx: MonitorRunnerExecutionContext, index: String, @@ -704,46 +813,55 @@ object DocumentLevelMonitorRunner : MonitorRunner() { SearchSourceBuilder() .version(true) .query(boolQueryBuilder) - .size(10000) // fixme: make this configurable. + .size(10000) ) .preference(Preference.PRIMARY_FIRST.type()) val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search shard: $shard") + throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } return response.hits } - private suspend fun getMatchedQueries( + /** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/ + private suspend fun runPercolateQueryOnTransformedDocs( monitorCtx: MonitorRunnerExecutionContext, - docs: List, + docs: MutableList>, monitor: Monitor, monitorMetadata: MonitorMetadata, - index: String, - concreteIndex: String + concreteIndices: List, + monitorInputIndices: List, ): SearchHits { - val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) - - val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) + val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) + val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) + val percolateQueryBuilder = + PercolateQueryBuilderExt("query", docs.map { it.second.docSource }, XContentType.JSON) if (monitor.id.isNotEmpty()) { boolQueryBuilder.must(QueryBuilders.matchQuery("monitor_id", monitor.id).operator(Operator.AND)) } boolQueryBuilder.filter(percolateQueryBuilder) - - val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id] - if (queryIndex == null) { - val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" + - " sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}" + val queryIndices = + docs.map { monitorMetadata.sourceToQueryIndexMapping[it.second.indexName + monitor.id] }.distinct() + if (queryIndices.isEmpty()) { + val message = + "Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" + + " sourceIndices: $monitorInputIndices" logger.error(message) throw AlertingException.wrap( OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) } - val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type()) + + val searchRequest = + SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) - + logger.debug( + "Monitor ${monitor.id}: " + + "Executing percolate query for docs from source indices " + + "$monitorInputIndices against query index $queryIndices" + ) var response: SearchResponse try { response = monitorCtx.client!!.suspendUntil { @@ -751,42 +869,58 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } catch (e: Exception) { throw IllegalStateException( - "Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e + "Monitor ${monitor.id}:" + + " Failed to run percolate search for sourceIndex [${concreteIndices.joinToString()}] " + + "and queryIndex [${queryIndices.joinToString()}] for ${docs.size} document(s)", + e ) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search percolate index: $queryIndex") + throw IOException( + "Monitor ${monitor.id}: Failed to search percolate index: ${queryIndices.joinToString()}. " + + "Response status is ${response.status()}" + ) } return response.hits } + /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ + private fun buildShouldClausesOverPerIndexMatchQueries(indices: List): BoolQueryBuilder { + val boolQueryBuilder = QueryBuilders.boolQuery() + indices.forEach { boolQueryBuilder.should(QueryBuilders.matchQuery("index", it)) } + return boolQueryBuilder + } - private fun getAllDocs( + /** Transform field names and index names in all the search hits to format required to run percolate search against them. + * Hits are transformed using method transformDocumentFieldNames() */ + private fun transformSearchHitsAndReconstructDocs( hits: SearchHits, index: String, concreteIndex: String, monitorId: String, - conflictingFields: List - ): List> { - return hits.map { hit -> - val sourceMap = hit.sourceAsMap - - transformDocumentFieldNames( - sourceMap, - conflictingFields, - "_${index}_$monitorId", - "_${concreteIndex}_$monitorId", - "" - ) - - var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) - - val sourceRef = BytesReference.bytes(xContentBuilder) - - logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString()) - - Pair(hit.id, sourceRef) - } + conflictingFields: List, + docsSizeInBytes: AtomicLong, + ): List> { + return hits.mapNotNull(fun(hit: SearchHit): Pair? { + try { + val sourceMap = hit.sourceAsMap + transformDocumentFieldNames( + sourceMap, + conflictingFields, + "_${index}_$monitorId", + "_${concreteIndex}_$monitorId", + "" + ) + var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) + val sourceRef = BytesReference.bytes(xContentBuilder) + docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed()) + return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef)) + } catch (e: Exception) { + logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) + // skip any document which we fail to transform because we anyway won't be able to run percolate queries on them. + return null + } + }) } /** @@ -839,4 +973,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } jsonAsMap.putAll(tempMap) } + + /** + * Returns true, if the docs fetched from shards thus far amount to less than threshold + * amount of percentage (default:10. setting is dynamic and configurable) of the total heap size or not. + * + */ + private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes + val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes + + return docsBytesSize > thresholdBytes + } + + private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + return numDocs >= maxNumDocsThreshold + } + + /** + * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name + * and doc source. A list of these POJOs would be passed to percolate query execution logic. + */ + private data class TransformedDocDto( + var indexName: String, + var concreteIndexName: String, + var docId: String, + var docSource: BytesReference + ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 2c98495de..2e72af40b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -18,6 +18,7 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService import org.opensearch.threadpool.ThreadPool @@ -36,6 +37,7 @@ data class MonitorRunnerExecutionContext( var alertService: AlertService? = null, var docLevelMonitorQueries: DocLevelMonitorQueries? = null, var workflowService: WorkflowService? = null, + var jvmStats: JvmStats? = null, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 103da2230..fe8e94734 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -50,6 +50,7 @@ import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.util.isBucketLevelMonitor import org.opensearch.core.action.ActionListener import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript @@ -134,6 +135,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerJvmStats(jvmStats: JvmStats): MonitorRunnerService { + this.monitorCtx.jvmStats = jvmStats + return this + } + // Must be called after registerClusterService and registerSettings in AlertingPlugin fun registerConsumers(): MonitorRunnerService { monitorCtx.retryPolicy = BackoffPolicy.constantBackoff( @@ -258,11 +264,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon when (job) { is Workflow -> { launch { + logger.debug( + "PERF_DEBUG: executing workflow ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) runJob(job, periodStart, periodEnd, false) } } is Monitor -> { launch { + logger.debug( + "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) runJob(job, periodStart, periodEnd, false) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 743d582e5..70979e3e4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -25,6 +25,29 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** Defines the threshold percentage of heap size in bytes till which we accumulate docs in memory before we query against percolate query + * index in document level monitor execution. + */ + val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit", + 10, + 0, + 100, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document + * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the + * monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate + * query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current + * execution + */ + val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_max_num_docs_in_memory", + 300000, 1000, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val INPUT_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.input_timeout", LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, From 5bf9f7276e8133b532c41c168e268777838e9025 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 19 Feb 2024 13:28:20 -0800 Subject: [PATCH 2/3] fix stats logging Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 57 +++++++++++++++---- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 498602f34..f43beda55 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -166,7 +166,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val docsSizeInBytes = AtomicLong(0) val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() - val queryingStartTimeMillis = System.currentTimeMillis() docLevelMonitorInput.indices.forEach { indexName -> var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), @@ -212,7 +211,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val indexUpdatedRunContext = updateLastRunContext( indexLastRunContext.toMutableMap(), monitorCtx, - concreteIndexName + concreteIndexName, + nonPercolateSearchesTimeTaken ) as MutableMap if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) @@ -273,7 +273,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { updatedIndexNames, concreteIndicesSeenSoFar, inputRunResults, - docsToQueries + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried ) } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) @@ -346,6 +348,22 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + } finally { + logger.debug( + "PERF_DEBUG_STATS: Monitor ${monitor.id} " + + "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken" + ) + logger.debug( + "PERF_DEBUG_STATS: Monitor {} Time spent on percolate queries in millis: {}", + monitor.id, + percolateQueriesTimeTaken + ) + logger.debug( + "PERF_DEBUG_STATS: Monitor {} Time spent on transforming doc fields in millis: {}", + monitor.id, + docTransformTimeTaken + ) + logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueried) } } @@ -586,13 +604,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun updateLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, - index: String + index: String, + nonPercolateSearchesTimeTaken: AtomicLong ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) + val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken) updatedLastRunContext[shard] = maxSeqNo.toString() } return updatedLastRunContext @@ -629,7 +648,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * Get the current max seq number of the shard. We find it by searching the last document * in the primary shard. */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { + private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long { val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -645,6 +664,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } + nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) if (response.hits.hits.isEmpty()) return -1L @@ -695,8 +715,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo, maxSeqNo, null, - docIds + docIds, + nonPercolateSearchesTimeTaken ) + val startTime = System.currentTimeMillis() transformedDocs.addAll( transformSearchHitsAndReconstructDocs( hits, @@ -707,6 +729,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsSizeInBytes ) ) + docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime) } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + @@ -727,7 +750,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorInputIndices, concreteIndices, inputRunResults, - docsToQueries + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried ) } } @@ -752,6 +777,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices: List, inputRunResults: MutableMap>, docsToQueries: MutableMap>, + percolateQueriesTimeTaken: AtomicLong, + totalDocsQueried: AtomicLong ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( @@ -760,7 +787,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor, monitorMetadata, concreteIndices, - monitorInputIndices + monitorInputIndices, + percolateQueriesTimeTaken ) percolateQueryResponseHits.forEach { hit -> @@ -774,7 +802,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } } - } finally { // no catch block because exception is caught and handled in runMonitor() class + totalDocsQueried.getAndAdd(transformedDocs.size.toLong()) + } finally { transformedDocs.clear() docsSizeInBytes.set(0) } @@ -790,7 +819,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo: Long?, maxSeqNo: Long, query: String?, - docIds: List? = null + docIds: List? = null, + nonPercolateSearchesTimeTaken: AtomicLong, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -818,8 +848,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .preference(Preference.PRIMARY_FIRST.type()) val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { + logger.error("Failed search shard. Response: $response") throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } + nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) return response.hits } @@ -831,6 +863,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, concreteIndices: List, monitorInputIndices: List, + percolateQueriesTimeTaken: AtomicLong, ): SearchHits { val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) @@ -882,6 +915,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { "Response status is ${response.status()}" ) } + logger.debug("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + percolateQueriesTimeTaken.getAndAdd(response.took.millis) return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ From 25d722a7fc73fc988a0f6a8ecb12cd00119d2129 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 20 Feb 2024 23:40:46 -0800 Subject: [PATCH 3/3] add queryfieldnames field in findings mapping Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 89 ++++++------------- .../alerting/MonitorRunnerService.kt | 2 +- .../workflow/CompositeWorkflowRunner.kt | 2 +- .../alerting/alerts/finding_mapping.json | 3 + 4 files changed, 34 insertions(+), 62 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index f43beda55..0bd88c003 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -69,12 +69,20 @@ import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID -import java.util.concurrent.atomic.AtomicLong import java.util.stream.Collectors import kotlin.math.max -object DocumentLevelMonitorRunner : MonitorRunner() { +class DocumentLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) + var nonPercolateSearchesTimeTakenStat = 0L + var percolateQueriesTimeTakenStat = 0L + var totalDocsQueriedStat = 0L + var docTransformTimeTakenStat = 0L + var totalDocsSizeInBytesStat = 0L + var docsSizeOfBatchInBytes = 0L + /* Contains list of docs source that are held in memory to submit to percolate query against query index. + * Docs are fetched from the source index per shard and transformed.*/ + val transformedDocs = mutableListOf>() override suspend fun runMonitor( monitor: Monitor, @@ -88,10 +96,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) - var nonPercolateSearchesTimeTaken = AtomicLong(0) - var percolateQueriesTimeTaken = AtomicLong(0) - var totalDocsQueried = AtomicLong(0) - var docTransformTimeTaken = AtomicLong(0) + try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) @@ -160,10 +165,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex - /* Contains list of docs source that are held in memory to submit to percolate query against query index. - * Docs are fetched from the source index per shard and transformed.*/ - val transformedDocs = mutableListOf>() - val docsSizeInBytes = AtomicLong(0) val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() docLevelMonitorInput.indices.forEach { indexName -> @@ -212,7 +213,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexLastRunContext.toMutableMap(), monitorCtx, concreteIndexName, - nonPercolateSearchesTimeTaken ) as MutableMap if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) @@ -250,14 +250,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata, inputRunResults, docsToQueries, - transformedDocs, - docsSizeInBytes, updatedIndexNames, concreteIndicesSeenSoFar, - nonPercolateSearchesTimeTaken, - percolateQueriesTimeTaken, - totalDocsQueried, - docTransformTimeTaken ) } } @@ -266,16 +260,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (transformedDocs.isNotEmpty()) { performPercolateQueryAndResetCounters( monitorCtx, - transformedDocs, - docsSizeInBytes, monitor, monitorMetadata, updatedIndexNames, concreteIndicesSeenSoFar, inputRunResults, docsToQueries, - percolateQueriesTimeTaken, - totalDocsQueried ) } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) @@ -351,19 +341,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } finally { logger.debug( "PERF_DEBUG_STATS: Monitor ${monitor.id} " + - "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken" + "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTakenStat" ) logger.debug( "PERF_DEBUG_STATS: Monitor {} Time spent on percolate queries in millis: {}", monitor.id, - percolateQueriesTimeTaken + percolateQueriesTimeTakenStat ) logger.debug( "PERF_DEBUG_STATS: Monitor {} Time spent on transforming doc fields in millis: {}", monitor.id, - docTransformTimeTaken + docTransformTimeTakenStat ) - logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueried) + logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueriedStat) } } @@ -605,13 +595,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, index: String, - nonPercolateSearchesTimeTaken: AtomicLong ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken) + val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) updatedLastRunContext[shard] = maxSeqNo.toString() } return updatedLastRunContext @@ -648,7 +637,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * Get the current max seq number of the shard. We find it by searching the last document * in the primary shard. */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long { + private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -664,7 +653,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } - nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) + nonPercolateSearchesTimeTakenStat += response.took.millis if (response.hits.hits.isEmpty()) return -1L @@ -692,14 +681,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, - transformedDocs: MutableList>, - docsSizeInBytes: AtomicLong, monitorInputIndices: List, concreteIndices: List, - nonPercolateSearchesTimeTaken: AtomicLong, - percolateQueriesTimeTaken: AtomicLong, - totalDocsQueried: AtomicLong, - docTransformTimeTake: AtomicLong, ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -715,8 +698,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo, maxSeqNo, null, - docIds, - nonPercolateSearchesTimeTaken + docIds ) val startTime = System.currentTimeMillis() transformedDocs.addAll( @@ -726,10 +708,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndexName, monitor.id, conflictingFields, - docsSizeInBytes ) ) - docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime) + docTransformTimeTakenStat += System.currentTimeMillis() - startTime } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + @@ -739,46 +720,37 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } if ( transformedDocs.isNotEmpty() && - shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx) ) { performPercolateQueryAndResetCounters( monitorCtx, - transformedDocs, - docsSizeInBytes, monitor, monitorMetadata, monitorInputIndices, concreteIndices, inputRunResults, docsToQueries, - percolateQueriesTimeTaken, - totalDocsQueried ) } } } private fun shouldPerformPercolateQueryAndFlushInMemoryDocs( - docsSizeInBytes: AtomicLong, numDocs: Int, monitorCtx: MonitorRunnerExecutionContext, ): Boolean { - return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) || + return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeOfBatchInBytes, monitorCtx) || isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) } private suspend fun performPercolateQueryAndResetCounters( monitorCtx: MonitorRunnerExecutionContext, - transformedDocs: MutableList>, - docsSizeInBytes: AtomicLong, monitor: Monitor, monitorMetadata: MonitorMetadata, monitorInputIndices: List, concreteIndices: List, inputRunResults: MutableMap>, docsToQueries: MutableMap>, - percolateQueriesTimeTaken: AtomicLong, - totalDocsQueried: AtomicLong ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( @@ -788,7 +760,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata, concreteIndices, monitorInputIndices, - percolateQueriesTimeTaken ) percolateQueryResponseHits.forEach { hit -> @@ -802,10 +773,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } } - totalDocsQueried.getAndAdd(transformedDocs.size.toLong()) + totalDocsQueriedStat += transformedDocs.size.toLong() } finally { transformedDocs.clear() - docsSizeInBytes.set(0) + docsSizeOfBatchInBytes = 0 } } @@ -820,7 +791,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { maxSeqNo: Long, query: String?, docIds: List? = null, - nonPercolateSearchesTimeTaken: AtomicLong, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -851,7 +821,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.error("Failed search shard. Response: $response") throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } - nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) + nonPercolateSearchesTimeTakenStat += response.took.millis return response.hits } @@ -863,7 +833,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, concreteIndices: List, monitorInputIndices: List, - percolateQueriesTimeTaken: AtomicLong, ): SearchHits { val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) @@ -916,7 +885,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } logger.debug("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") - percolateQueriesTimeTaken.getAndAdd(response.took.millis) + percolateQueriesTimeTakenStat += response.took.millis return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ @@ -934,7 +903,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndex: String, monitorId: String, conflictingFields: List, - docsSizeInBytes: AtomicLong, ): List> { return hits.mapNotNull(fun(hit: SearchHit): Pair? { try { @@ -948,7 +916,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) val sourceRef = BytesReference.bytes(xContentBuilder) - docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed()) + docsSizeOfBatchInBytes += sourceRef.ramBytesUsed() + totalDocsSizeInBytesStat += sourceRef.ramBytesUsed() return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef)) } catch (e: Exception) { logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) @@ -1031,7 +1000,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name * and doc source. A list of these POJOs would be passed to percolate query execution logic. */ - private data class TransformedDocDto( + data class TransformedDocDto( var indexName: String, var concreteIndexName: String, var docId: String, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index fe8e94734..4aa4c07f0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -321,7 +321,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon val runResult = if (monitor.isBucketLevelMonitor()) { BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } else if (monitor.isDocLevelMonitor()) { - DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) + DocumentLevelMonitorRunner().runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } else { QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index cfed18c89..a22b09bdc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -251,7 +251,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { executionId ) } else if (delegateMonitor.isDocLevelMonitor()) { - return DocumentLevelMonitorRunner.runMonitor( + return DocumentLevelMonitorRunner().runMonitor( delegateMonitor, monitorCtx, periodStart, diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index d2ecc0907..1bfea4ebc 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -49,6 +49,9 @@ }, "fields": { "type": "text" + }, + "query_field_names": { + "type": "keyword" } } },