From 914ca9ad730329532e4931cb450b93c743cd96ab Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 20 Dec 2023 00:22:41 -0800 Subject: [PATCH 01/16] refactor doc level monitor to perform a percolate queru for docs from each shard instead of performing one percolate query on docs from all shards Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 152 +++++++++++------- 1 file changed, 90 insertions(+), 62 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 3b8e4dee7..272da7385 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -57,6 +57,7 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders import org.opensearch.percolator.PercolateQueryBuilderExt +import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder @@ -219,39 +220,18 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - val matchingDocs = getMatchingDocs( + fetchDataAndExecutePercolateQueriesPerShard( monitor, monitorCtx, docExecutionContext, updatedIndexName, concreteIndexName, conflictingFields.toList(), - matchingDocIdsPerIndex?.get(concreteIndexName) + matchingDocIdsPerIndex?.get(concreteIndexName), + monitorMetadata, + inputRunResults, + docsToQueries ) - - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( - monitorCtx, - matchingDocs.map { it.second }, - monitor, - monitorMetadata, - updatedIndexName, - concreteIndexName - ) - - matchedQueriesForDocs.forEach { hit -> - val id = hit.id - .replace("_${updatedIndexName}_${monitor.id}", "") - .replace("_${concreteIndexName}_${monitor.id}", "") - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${matchingDocs[idx].first}|$concreteIndexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) - } - } - } } } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) @@ -598,17 +578,25 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return allShards.filter { it.primary() }.size } - private suspend fun getMatchingDocs( + /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. needs to be converted to scroll if performant enough) + * 2. Transform documents to submit to percolate query + * 3. perform percolate queries + * 4. update docToQueries Map with all hits from percolate queries + * */ + private suspend fun fetchDataAndExecutePercolateQueriesPerShard( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, - index: String, - concreteIndex: String, + indexName: String, + concreteIndexName: String, conflictingFields: List, - docIds: List? = null - ): List> { + docIds: List? = null, + monitorMetadata: MonitorMetadata, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int - val matchingDocs = mutableListOf>() + val transformedDocs = mutableListOf>() for (i: Int in 0 until count) { val shard = i.toString() try { @@ -617,24 +605,59 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val hits: SearchHits = searchShard( monitorCtx, - concreteIndex, + concreteIndexName, shard, prevSeqNo, maxSeqNo, null, docIds ) + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexName, + concreteIndexName, + monitor.id, + conflictingFields + ) + ) + } catch (e: Exception) { + logger.error( + "Monitor ${monitor.id} :" + + " Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}", + e + ) + } + + if (transformedDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries( + monitorCtx, + transformedDocs.map { it.second }, + monitor, + monitorMetadata, + indexName, + concreteIndexName + ) + + matchedQueriesForDocs.forEach { hit -> + val id = hit.id + .replace("_${indexName}_${monitor.id}", "") + .replace("_${concreteIndexName}_${monitor.id}", "") - if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${transformedDocs[idx].first}|$concreteIndexName" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + } } - } catch (e: Exception) { - logger.warn("Failed to run for shard $shard. Error: ${e.message}") } } - return matchingDocs } + /** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo. + * This method hence fetches only docs from shard which haven't been queried before + */ private suspend fun searchShard( monitorCtx: MonitorRunnerExecutionContext, index: String, @@ -665,12 +688,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { SearchSourceBuilder() .version(true) .query(boolQueryBuilder) - .size(10000) // fixme: make this configurable. + .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k ) .preference(Preference.PRIMARY_FIRST.type()) val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search shard: $shard") + throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } return response.hits } @@ -681,7 +704,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor: Monitor, monitorMetadata: MonitorMetadata, index: String, - concreteIndex: String + concreteIndex: String, ): SearchHits { val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) @@ -722,32 +745,37 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return response.hits } - private fun getAllDocs( + /** Transform field names and index names in all the search hits to format required to run percolate search against them. + * Hits are transformed using method transformDocumentFieldNames() */ + private fun transformSearchHitsAndReconstructDocs( hits: SearchHits, index: String, concreteIndex: String, monitorId: String, - conflictingFields: List + conflictingFields: List, ): List> { - return hits.map { hit -> - val sourceMap = hit.sourceAsMap - - transformDocumentFieldNames( - sourceMap, - conflictingFields, - "_${index}_$monitorId", - "_${concreteIndex}_$monitorId", - "" - ) - - var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) - - val sourceRef = BytesReference.bytes(xContentBuilder) - - logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString()) - - Pair(hit.id, sourceRef) - } + return hits.mapNotNull(fun(hit: SearchHit): Pair? { + try { + val sourceMap = hit.sourceAsMap + transformDocumentFieldNames( + sourceMap, + conflictingFields, + "_${index}_$monitorId", + "_${concreteIndex}_$monitorId", + "" + ) + var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) + val sourceRef = BytesReference.bytes(xContentBuilder) + logger.debug( + "Monitor $monitorId: Document [${hit.id}] payload after transform for percolate query: ${sourceRef.utf8ToString()}", + ) + return Pair(hit.id, sourceRef) + } catch (e: Exception) { + logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) + // skip any document which we fail to transform because we anyway won't be able to run percolate queries on them. + return null + } + }) } /** From d0dd9c4907cd75a4d58f1ea1a274097b48a0d0d7 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 20 Dec 2023 13:53:39 -0800 Subject: [PATCH 02/16] move transformedDocs into for loop in fetchDataAndExecutePercolateQueriesPerShard() Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/DocumentLevelMonitorRunner.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 272da7385..c1ef77c82 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -596,8 +596,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries: MutableMap>, ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int - val transformedDocs = mutableListOf>() for (i: Int in 0 until count) { + val transformedDocs = mutableListOf>() val shard = i.toString() try { val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() From 58d55b250d99969b5052f8be6305587f87b5b984 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 20 Dec 2023 14:09:25 -0800 Subject: [PATCH 03/16] rename percolate query execution method Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index c1ef77c82..a0d661465 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -630,7 +630,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } if (transformedDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( + val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( monitorCtx, transformedDocs.map { it.second }, monitor, @@ -639,7 +639,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndexName ) - matchedQueriesForDocs.forEach { hit -> + percolateQueryResponseHits.forEach { hit -> val id = hit.id .replace("_${indexName}_${monitor.id}", "") .replace("_${concreteIndexName}_${monitor.id}", "") @@ -698,7 +698,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return response.hits } - private suspend fun getMatchedQueries( + /** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/ + private suspend fun runPercolateQueryOnTransformedDocs( monitorCtx: MonitorRunnerExecutionContext, docs: List, monitor: Monitor, @@ -727,7 +728,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) - + logger.debug("Monitor ${monitor.id}: Executing percolate query for docs from source index $index against query index $queryIndex") var response: SearchResponse try { response = monitorCtx.client!!.suspendUntil { @@ -735,12 +736,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } catch (e: Exception) { throw IllegalStateException( - "Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e + "Monitor ${monitor.id}: Failed to run percolate search for sourceIndex [$index] " + + "and queryIndex [$queryIndex] for ${docs.size} document(s)", e ) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search percolate index: $queryIndex") + throw IOException("Failed to search percolate index: $queryIndex. Response status is ${response.status()}") } return response.hits } From b86a403d3ec4b03ffd2189916f5d416e217eec49 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 20 Dec 2023 15:20:48 -0800 Subject: [PATCH 04/16] fix log messages Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/DocumentLevelMonitorRunner.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index a0d661465..5fec8df7e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -742,7 +742,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search percolate index: $queryIndex. Response status is ${response.status()}") + throw IOException( + "Monitor ${monitor.id}: Failed to search percolate index: $queryIndex. Response status is ${response.status()}" + ) } return response.hits } From 6f2990f2c50fd88708493b8654064895fe08a439 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 26 Dec 2023 16:28:28 -0800 Subject: [PATCH 05/16] add setting for percolate query docs size memory threshold to perform percolate query Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertingPlugin.kt | 3 + .../alerting/DocumentLevelMonitorRunner.kt | 120 ++++++++++++++---- .../alerting/MonitorRunnerExecutionContext.kt | 2 + .../alerting/MonitorRunnerService.kt | 6 + .../alerting/settings/AlertingSettings.kt | 10 ++ 5 files changed, 119 insertions(+), 22 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 0a80f33ae..489a846f9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -97,6 +97,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.painless.spi.Allowlist import org.opensearch.painless.spi.AllowlistLoader import org.opensearch.painless.spi.PainlessExtension @@ -257,6 +258,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerTriggerService(TriggerService(scriptService)) .registerAlertService(AlertService(client, xContentRegistry, alertIndices)) .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) + .registerJvmStats(JvmStats.jvmStats()) .registerWorkflowService(WorkflowService(client, xContentRegistry)) .registerConsumers() .registerDestinationSettings() @@ -314,6 +316,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.ALERT_HISTORY_MAX_DOCS, AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, AlertingSettings.ALERTING_MAX_MONITORS, + AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, AlertingSettings.REQUEST_TIMEOUT, AlertingSettings.MAX_ACTION_THROTTLE_VALUE, AlertingSettings.FILTER_BY_BACKEND_ROLES, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 5fec8df7e..596f59a43 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -56,6 +57,7 @@ import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits @@ -64,6 +66,7 @@ import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID +import java.util.concurrent.atomic.AtomicLong import kotlin.math.max object DocumentLevelMonitorRunner : MonitorRunner() { @@ -150,7 +153,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + /* Contains list of docs source that in memory to submit to percolate query against query index. + * Docs are fetched from the source index per shard and transformed.*/ + val transformedDocs = mutableListOf>() + val docsSizeInBytes = AtomicLong(0) + var lastUpdatedIndexName: String? = null + var lastConcreteIndexName: String? = null docLevelMonitorInput.indices.forEach { indexName -> + var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), monitorCtx.clusterService!!, @@ -172,6 +182,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } val updatedIndexName = indexName.replace("*", "_") + lastUpdatedIndexName = updatedIndexName val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( monitorCtx.clusterService!!.state(), concreteIndices @@ -179,6 +190,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices.forEach { concreteIndexName -> // Prepare lastRunContext for each index + lastConcreteIndexName = concreteIndexName val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { val isIndexCreatedRecently = createdRecently( monitor, @@ -230,7 +242,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { matchingDocIdsPerIndex?.get(concreteIndexName), monitorMetadata, inputRunResults, - docsToQueries + docsToQueries, + transformedDocs, + docsSizeInBytes ) } } @@ -564,6 +578,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .size(1) ) val response: SearchResponse = client.suspendUntil { client.search(request, it) } + JvmStats.jvmStats() if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } @@ -594,10 +609,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, + transformedDocs: MutableList>, + docsSizeInBytes: AtomicLong, ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { - val transformedDocs = mutableListOf>() val shard = i.toString() try { val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() @@ -618,7 +634,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexName, concreteIndexName, monitor.id, - conflictingFields + conflictingFields, + docsSizeInBytes ) ) } catch (e: Exception) { @@ -628,30 +645,73 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) } - - if (transformedDocs.isNotEmpty()) { - val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( + if (transformedDocs.isNotEmpty() || isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) { + performPercolateQueryAndResetCounters( monitorCtx, - transformedDocs.map { it.second }, + transformedDocs, + docsSizeInBytes, monitor, monitorMetadata, indexName, - concreteIndexName + concreteIndexName, + inputRunResults, + docsToQueries ) + } + } + /* if all shards are covered still in-memory docs size limit is not breached we would need to submit + the percolate query at the end*/ + if (transformedDocs.isNotEmpty()) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + indexName, + concreteIndexName, + inputRunResults, + docsToQueries + ) + } + } + + private suspend fun performPercolateQueryAndResetCounters( + monitorCtx: MonitorRunnerExecutionContext, + transformedDocs: MutableList>, + docsSizeInBytes: AtomicLong, + monitor: Monitor, + monitorMetadata: MonitorMetadata, + indexName: String, + concreteIndexName: String, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + ) { + try { + val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( + monitorCtx, + transformedDocs.map { it.second }, + monitor, + monitorMetadata, + indexName, + concreteIndexName + ) - percolateQueryResponseHits.forEach { hit -> - val id = hit.id - .replace("_${indexName}_${monitor.id}", "") - .replace("_${concreteIndexName}_${monitor.id}", "") + percolateQueryResponseHits.forEach { hit -> + val id = hit.id + .replace("_${indexName}_${monitor.id}", "") + .replace("_${concreteIndexName}_${monitor.id}", "") - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${transformedDocs[idx].first}|$concreteIndexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) - } + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${transformedDocs[idx].first}|$concreteIndexName" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } } + } finally { // no catch block because exception is caught and handled in runMonitor() class + transformedDocs.clear() + docsSizeInBytes.set(0) } } @@ -737,7 +797,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } catch (e: Exception) { throw IllegalStateException( "Monitor ${monitor.id}: Failed to run percolate search for sourceIndex [$index] " + - "and queryIndex [$queryIndex] for ${docs.size} document(s)", e + "and queryIndex [$queryIndex] for ${docs.size} document(s)", + e ) } @@ -757,6 +818,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndex: String, monitorId: String, conflictingFields: List, + docsSizeInBytes: AtomicLong, ): List> { return hits.mapNotNull(fun(hit: SearchHit): Pair? { try { @@ -770,9 +832,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) val sourceRef = BytesReference.bytes(xContentBuilder) - logger.debug( - "Monitor $monitorId: Document [${hit.id}] payload after transform for percolate query: ${sourceRef.utf8ToString()}", - ) + docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed()) return Pair(hit.id, sourceRef) } catch (e: Exception) { logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) @@ -832,4 +892,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } jsonAsMap.putAll(tempMap) } + + /** + * Returns true, if the docs fetched from shards thus far amount to less than threshold + * amount of percentage (default:10. setting is dynamic and configurable) of the total heap size or not. + * + */ + private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + if (thresholdPercentage > 100 || thresholdPercentage < 0) { + thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.getDefault(monitorCtx.settings) + } + val heapMaxBytes = JvmStats.jvmStats().mem.heapMax.bytes + val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes + + return docsBytesSize > thresholdBytes + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 41a26bb79..3b17ceebe 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -18,6 +18,7 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService import org.opensearch.threadpool.ThreadPool @@ -36,6 +37,7 @@ data class MonitorRunnerExecutionContext( var alertService: AlertService? = null, var docLevelMonitorQueries: DocLevelMonitorQueries? = null, var workflowService: WorkflowService? = null, + var jvmStats: JvmStats? = null, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index ca223f7a0..a884199f8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -48,6 +48,7 @@ import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.util.isBucketLevelMonitor import org.opensearch.core.action.ActionListener import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript @@ -132,6 +133,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerJvmStats(jvmStats: JvmStats): MonitorRunnerService { + this.monitorCtx.jvmStats = jvmStats + return this + } + // Must be called after registerClusterService and registerSettings in AlertingPlugin fun registerConsumers(): MonitorRunnerService { monitorCtx.retryPolicy = BackoffPolicy.constantBackoff( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index e23d44c5b..54d5d5b82 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -24,6 +24,16 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** Defines the threshold of the docs accumulated in memory to query against percolate query index in document + * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the + * monitor input indices field. + */ + val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit", + 10, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val INPUT_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.input_timeout", LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, From 92feae03f470d34db60d986061f89baea78fab7f Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 3 Jan 2024 19:04:28 -0800 Subject: [PATCH 06/16] perform percolate query only if threshold setting breached or at the end of collecting data from all indices Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 153 ++++++++++-------- 1 file changed, 89 insertions(+), 64 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 596f59a43..86fd18ff0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -67,6 +67,7 @@ import java.io.IOException import java.time.Instant import java.util.UUID import java.util.concurrent.atomic.AtomicLong +import java.util.stream.Collectors import kotlin.math.max object DocumentLevelMonitorRunner : MonitorRunner() { @@ -153,12 +154,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex - /* Contains list of docs source that in memory to submit to percolate query against query index. + /* Contains list of docs source that are held in memory to submit to percolate query against query index. * Docs are fetched from the source index per shard and transformed.*/ - val transformedDocs = mutableListOf>() + val transformedDocs = mutableListOf>() val docsSizeInBytes = AtomicLong(0) - var lastUpdatedIndexName: String? = null - var lastConcreteIndexName: String? = null + val concreteIndicesSeenSoFar = mutableListOf() docLevelMonitorInput.indices.forEach { indexName -> var concreteIndices = IndexUtils.resolveAllIndices( @@ -181,8 +181,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } } + concreteIndicesSeenSoFar.addAll(concreteIndices) val updatedIndexName = indexName.replace("*", "_") - lastUpdatedIndexName = updatedIndexName +// lastUpdatedIndexName = updatedIndexName val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( monitorCtx.clusterService!!.state(), concreteIndices @@ -190,7 +191,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices.forEach { concreteIndexName -> // Prepare lastRunContext for each index - lastConcreteIndexName = concreteIndexName +// lastConcreteIndexName = concreteIndexName val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { val isIndexCreatedRecently = createdRecently( monitor, @@ -232,7 +233,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - fetchDataAndExecutePercolateQueriesPerShard( + fetchShardDataAndMaybeExecutePercolateQueries( monitor, monitorCtx, docExecutionContext, @@ -244,10 +245,27 @@ object DocumentLevelMonitorRunner : MonitorRunner() { inputRunResults, docsToQueries, transformedDocs, - docsSizeInBytes + docsSizeInBytes, + docLevelMonitorInput.indices, + concreteIndicesSeenSoFar ) } } + /* if all indices are covered still in-memory docs size limit is not breached we would need to submit + the percolate query at the end*/ + if (transformedDocs.isNotEmpty()) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + docLevelMonitorInput.indices, + concreteIndicesSeenSoFar, + inputRunResults, + docsToQueries + ) + } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -593,12 +611,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return allShards.filter { it.primary() }.size } - /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. needs to be converted to scroll if performant enough) - * 2. Transform documents to submit to percolate query - * 3. perform percolate queries - * 4. update docToQueries Map with all hits from percolate queries - * */ - private suspend fun fetchDataAndExecutePercolateQueriesPerShard( + /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. + * needs to be converted to scroll if not performant enough) + * 2. Transform documents to conform to format required for percolate query + * 3a. Check if docs in memory are crossing threshold defined by setting. + * 3b. If yes, perform percolate query and update docToQueries Map with all hits from percolate queries */ + private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, @@ -609,8 +627,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, - transformedDocs: MutableList>, + transformedDocs: MutableList>, docsSizeInBytes: AtomicLong, + monitorInputIndices: List, + concreteIndices: List, ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -645,66 +665,50 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) } - if (transformedDocs.isNotEmpty() || isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) { + if (transformedDocs.isNotEmpty() && isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) { performPercolateQueryAndResetCounters( monitorCtx, transformedDocs, docsSizeInBytes, monitor, monitorMetadata, - indexName, - concreteIndexName, + monitorInputIndices, + concreteIndices, inputRunResults, docsToQueries ) } } - /* if all shards are covered still in-memory docs size limit is not breached we would need to submit - the percolate query at the end*/ - if (transformedDocs.isNotEmpty()) { - performPercolateQueryAndResetCounters( - monitorCtx, - transformedDocs, - docsSizeInBytes, - monitor, - monitorMetadata, - indexName, - concreteIndexName, - inputRunResults, - docsToQueries - ) - } } private suspend fun performPercolateQueryAndResetCounters( monitorCtx: MonitorRunnerExecutionContext, - transformedDocs: MutableList>, + transformedDocs: MutableList>, docsSizeInBytes: AtomicLong, monitor: Monitor, monitorMetadata: MonitorMetadata, - indexName: String, - concreteIndexName: String, + monitorInputIndices: List, + concreteIndices: List, inputRunResults: MutableMap>, docsToQueries: MutableMap>, ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( monitorCtx, - transformedDocs.map { it.second }, + transformedDocs, monitor, monitorMetadata, - indexName, - concreteIndexName + concreteIndices, + monitorInputIndices ) percolateQueryResponseHits.forEach { hit -> - val id = hit.id - .replace("_${indexName}_${monitor.id}", "") - .replace("_${concreteIndexName}_${monitor.id}", "") - + var id = hit.id + concreteIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + monitorInputIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } docIndices.forEach { idx -> - val docIndex = "${transformedDocs[idx].first}|$concreteIndexName" + val docIndex = "${transformedDocs[idx].first}|${transformedDocs[idx].second.concreteIndexName}" inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } @@ -761,34 +765,42 @@ object DocumentLevelMonitorRunner : MonitorRunner() { /** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/ private suspend fun runPercolateQueryOnTransformedDocs( monitorCtx: MonitorRunnerExecutionContext, - docs: List, + docs: MutableList>, monitor: Monitor, monitorMetadata: MonitorMetadata, - index: String, - concreteIndex: String, + concreteIndices: List, + monitorInputIndices: List, ): SearchHits { - val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) - - val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) + val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) + val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.termsQuery("index", indices)) + val percolateQueryBuilder = + PercolateQueryBuilderExt("query", docs.map { it.second.docSource }, XContentType.JSON) if (monitor.id.isNotEmpty()) { boolQueryBuilder.must(QueryBuilders.matchQuery("monitor_id", monitor.id).operator(Operator.AND)) } boolQueryBuilder.filter(percolateQueryBuilder) - - val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id] - if (queryIndex == null) { - val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" + - " sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}" + val queryIndices = + docs.map { monitorMetadata.sourceToQueryIndexMapping[it.second.indexName + monitor.id] }.distinct() + if (queryIndices.isEmpty()) { + val message = + "Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" + + " sourceIndices: $monitorInputIndices" logger.error(message) throw AlertingException.wrap( OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) } - val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type()) + + val searchRequest = + SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) - logger.debug("Monitor ${monitor.id}: Executing percolate query for docs from source index $index against query index $queryIndex") + logger.debug( + "Monitor ${monitor.id}: " + + "Executing percolate query for docs from source indices " + + "$monitorInputIndices against query index $queryIndices" + ) var response: SearchResponse try { response = monitorCtx.client!!.suspendUntil { @@ -796,15 +808,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } catch (e: Exception) { throw IllegalStateException( - "Monitor ${monitor.id}: Failed to run percolate search for sourceIndex [$index] " + - "and queryIndex [$queryIndex] for ${docs.size} document(s)", + "Monitor ${monitor.id}:" + + " Failed to run percolate search for sourceIndex [${concreteIndices.joinToString()}] " + + "and queryIndex [${queryIndices.joinToString()}] for ${docs.size} document(s)", e ) } if (response.status() !== RestStatus.OK) { throw IOException( - "Monitor ${monitor.id}: Failed to search percolate index: $queryIndex. Response status is ${response.status()}" + "Monitor ${monitor.id}: Failed to search percolate index: ${queryIndices.joinToString()}. " + + "Response status is ${response.status()}" ) } return response.hits @@ -819,8 +833,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorId: String, conflictingFields: List, docsSizeInBytes: AtomicLong, - ): List> { - return hits.mapNotNull(fun(hit: SearchHit): Pair? { + ): List> { + return hits.mapNotNull(fun(hit: SearchHit): Pair? { try { val sourceMap = hit.sourceAsMap transformDocumentFieldNames( @@ -833,7 +847,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) val sourceRef = BytesReference.bytes(xContentBuilder) docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed()) - return Pair(hit.id, sourceRef) + return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef)) } catch (e: Exception) { logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) // skip any document which we fail to transform because we anyway won't be able to run percolate queries on them. @@ -903,9 +917,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (thresholdPercentage > 100 || thresholdPercentage < 0) { thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.getDefault(monitorCtx.settings) } - val heapMaxBytes = JvmStats.jvmStats().mem.heapMax.bytes + val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes return docsBytesSize > thresholdBytes } + + /** + * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name + * and doc source. A list of these POJOs would be passed to percolate query execution logic. + */ + private data class TransformedDocDto( + var indexName: String, + var concreteIndexName: String, + var docId: String, + var docSource: BytesReference + ) } From bfea7e6df88d7f762ba73d934b1cec676ce0e2eb Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 5 Jan 2024 10:59:01 -0800 Subject: [PATCH 07/16] change terms query clause on indices to list of should clauses as index is a text field in query index mapping Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 86fd18ff0..ab90d29ad 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -183,7 +183,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } concreteIndicesSeenSoFar.addAll(concreteIndices) val updatedIndexName = indexName.replace("*", "_") -// lastUpdatedIndexName = updatedIndexName val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( monitorCtx.clusterService!!.state(), concreteIndices @@ -191,7 +190,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices.forEach { concreteIndexName -> // Prepare lastRunContext for each index -// lastConcreteIndexName = concreteIndexName val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { val isIndexCreatedRecently = createdRecently( monitor, @@ -252,7 +250,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } /* if all indices are covered still in-memory docs size limit is not breached we would need to submit - the percolate query at the end*/ + the percolate query at the end */ if (transformedDocs.isNotEmpty()) { performPercolateQueryAndResetCounters( monitorCtx, @@ -772,7 +770,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorInputIndices: List, ): SearchHits { val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) - val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.termsQuery("index", indices)) + val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs.map { it.second.docSource }, XContentType.JSON) if (monitor.id.isNotEmpty()) { @@ -823,6 +821,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } return response.hits } + /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ + private fun buildShouldClausesOverPerIndexMatchQueries(indices: List): BoolQueryBuilder { + val boolQueryBuilder = QueryBuilders.boolQuery() + indices.forEach { boolQueryBuilder.should(QueryBuilders.matchQuery("index", it)) } + return boolQueryBuilder + } /** Transform field names and index names in all the search hits to format required to run percolate search against them. * Hits are transformed using method transformDocumentFieldNames() */ From 80c64df3471e0d8e7d853fab36a97fd6c40e73b9 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 10 Jan 2024 23:57:48 -0800 Subject: [PATCH 08/16] add setting and check on num docs in memory to determine if percolate query should be performed immediately Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 23 +++++++++++++++---- .../alerting/settings/AlertingSettings.kt | 14 +++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index ab90d29ad..39a359639 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -23,6 +23,7 @@ import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -663,7 +664,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) } - if (transformedDocs.isNotEmpty() && isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) { + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + ) { performPercolateQueryAndResetCounters( monitorCtx, transformedDocs, @@ -679,6 +683,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } + private fun shouldPerformPercolateQueryAndFlushInMemoryDocs( + docsSizeInBytes: AtomicLong, + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) || + isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) + } + private suspend fun performPercolateQueryAndResetCounters( monitorCtx: MonitorRunnerExecutionContext, transformedDocs: MutableList>, @@ -918,15 +931,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() { */ private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) - if (thresholdPercentage > 100 || thresholdPercentage < 0) { - thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.getDefault(monitorCtx.settings) - } val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes return docsBytesSize > thresholdBytes } + private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + return numDocs >= maxNumDocsThreshold + } + /** * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name * and doc source. A list of these POJOs would be passed to percolate query execution logic. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 54d5d5b82..c7d571824 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -31,6 +31,20 @@ class AlertingSettings { val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting( "plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit", 10, + 0, + 100, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document + * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the + * monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate + * query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current + * execution + */ + val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_max_num_docs_in_memory", + 10000, 1000, Setting.Property.NodeScope, Setting.Property.Dynamic ) From 560156583a4dc3f23ec146c078c2fcaf2c18fe71 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 11 Jan 2024 01:04:50 -0800 Subject: [PATCH 09/16] use updated index names while process findings Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/DocumentLevelMonitorRunner.kt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 39a359639..e5147a3e6 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -160,6 +160,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val transformedDocs = mutableListOf>() val docsSizeInBytes = AtomicLong(0) val concreteIndicesSeenSoFar = mutableListOf() + val updatedIndexNames = mutableListOf() docLevelMonitorInput.indices.forEach { indexName -> var concreteIndices = IndexUtils.resolveAllIndices( @@ -184,6 +185,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } concreteIndicesSeenSoFar.addAll(concreteIndices) val updatedIndexName = indexName.replace("*", "_") + updatedIndexNames.add(updatedIndexName) val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( monitorCtx.clusterService!!.state(), concreteIndices @@ -245,7 +247,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries, transformedDocs, docsSizeInBytes, - docLevelMonitorInput.indices, + updatedIndexNames, concreteIndicesSeenSoFar ) } @@ -259,7 +261,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsSizeInBytes, monitor, monitorMetadata, - docLevelMonitorInput.indices, + updatedIndexNames, concreteIndicesSeenSoFar, inputRunResults, docsToQueries From be29a78e688ada4e1db8501557c0b61817c7c6ef Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 11 Jan 2024 02:03:53 -0800 Subject: [PATCH 10/16] register setting in plugin class Signed-off-by: Surya Sashank Nistala --- .../src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 489a846f9..4f8e473d9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -317,6 +317,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, AlertingSettings.ALERTING_MAX_MONITORS, AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY, AlertingSettings.REQUEST_TIMEOUT, AlertingSettings.MAX_ACTION_THROTTLE_VALUE, AlertingSettings.FILTER_BY_BACKEND_ROLES, From 009cd6128697d00f79c40f3111dc9315a6404561 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 24 Jan 2024 17:10:49 -0800 Subject: [PATCH 11/16] optimize to fetch only relevant fields from source data in doc level monitor to submit to percolate query instead of docs_source Signed-off-by: Surya Sashank Nistala --- alerting/build.gradle | 5 -- .../org/opensearch/alerting/AlertingPlugin.kt | 1 + .../alerting/DocumentLevelMonitorRunner.kt | 55 +++++++++++++++++-- .../alerting/settings/AlertingSettings.kt | 10 ++++ .../alerting/alerts/finding_mapping.json | 3 + core/build.gradle | 2 +- 6 files changed, 66 insertions(+), 10 deletions(-) diff --git a/alerting/build.gradle b/alerting/build.gradle index efaff1b6f..5f6bd9098 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -424,9 +424,4 @@ run { useCluster testClusters.integTest } -// Only apply jacoco test coverage if we are running a local single node cluster -if (!usingRemoteCluster && !usingMultiNode) { - apply from: '../build-tools/opensearchplugin-coverage.gradle' -} - apply from: '../build-tools/pkgbuild.gradle' diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 4f8e473d9..e53187c7b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -338,6 +338,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE, LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES, + AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED, DestinationSettings.EMAIL_USERNAME, DestinationSettings.EMAIL_PASSWORD, DestinationSettings.ALLOW_LIST, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index e5147a3e6..1d0473543 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.util.AlertingException @@ -230,6 +231,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) } } + val fieldsToBeQueried = mutableSetOf() + + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + logger.debug( + "Monitor ${monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) + } // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) @@ -248,7 +264,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { transformedDocs, docsSizeInBytes, updatedIndexNames, - concreteIndicesSeenSoFar + concreteIndicesSeenSoFar, + ArrayList(fieldsToBeQueried) ) } } @@ -632,6 +649,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsSizeInBytes: AtomicLong, monitorInputIndices: List, concreteIndices: List, + fieldsToBeQueried: List ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -647,7 +665,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo, maxSeqNo, null, - docIds + docIds, + fieldsToBeQueried ) transformedDocs.addAll( transformSearchHitsAndReconstructDocs( @@ -742,7 +761,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo: Long?, maxSeqNo: Long, query: String?, - docIds: List? = null + docIds: List? = null, + fieldsToFetch: List, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -768,6 +788,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k ) .preference(Preference.PRIMARY_FIRST.type()) + + if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) { + logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}") + request.source().fetchSource(false) + for (field in fieldsToFetch) { + request.source().fetchField(field) + } + } val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") @@ -834,6 +862,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { "Response status is ${response.status()}" ) } + logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}") return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ @@ -855,7 +884,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ): List> { return hits.mapNotNull(fun(hit: SearchHit): Pair? { try { - val sourceMap = hit.sourceAsMap + val sourceMap = if (hit.hasSource()) { + hit.sourceAsMap + } else { + logger.error("PERF_DEBUG:Building percolate query source docs from relevant fields only") + constructSourceMapFromFieldsInHit(hit) + } transformDocumentFieldNames( sourceMap, conflictingFields, @@ -875,6 +909,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() { }) } + private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap { + if (hit.fields == null) + return mutableMapOf() + val sourceMap: MutableMap = mutableMapOf() + for (field in hit.fields) { + if (field.value.values != null && field.value.values.isNotEmpty()) + if (field.value.values.size == 1) { + sourceMap[field.key] = field.value.values[0] + } else sourceMap[field.key] = field.value.values + } + return sourceMap + } + /** * Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names * but different mappings & [fieldNameSuffixPattern] to field names which have unique names. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index c7d571824..1bf2dc663 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -48,6 +48,16 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** + * Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries. + * Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards. + */ + val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting( + "plugins.alerting.monitor.doc_level_monitor_fetch_only_query_fields_enabled", + true, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val INPUT_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.input_timeout", LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index d2ecc0907..1bfea4ebc 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -49,6 +49,9 @@ }, "fields": { "type": "text" + }, + "query_field_names": { + "type": "keyword" } } }, diff --git a/core/build.gradle b/core/build.gradle index b1ecf7eac..7aeb8c284 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -19,7 +19,7 @@ dependencies { exclude group: 'com.google.guava' } implementation 'com.google.guava:guava:32.0.1-jre' - api "org.opensearch:common-utils:${common_utils_version}@jar" + api files("/Users/snistala/Documents/opensearch/common-utils/build/libs/common-utils-3.0.0.0-SNAPSHOT.jar") implementation 'commons-validator:commons-validator:1.7' testImplementation "org.opensearch.test:framework:${opensearch_version}" From 743bda82179ac47706b30755eabc6f0190949188 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 26 Jan 2024 18:53:24 -0800 Subject: [PATCH 12/16] query shard up to max sequence number instead of just 10000 Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 100 +++++++++++------- .../alerting/MonitorMetadataService.kt | 1 + .../alerting/MonitorRunnerService.kt | 8 ++ .../alerting/util/DocLevelMonitorQueries.kt | 2 +- .../workflow/CompositeWorkflowRunner.kt | 6 +- .../opensearch/alerting/core/JobSweeper.kt | 6 +- 6 files changed, 79 insertions(+), 44 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 1d0473543..87b2fb985 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -33,7 +33,6 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata -import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.XContentFactory @@ -162,6 +161,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val docsSizeInBytes = AtomicLong(0) val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() + val queryingStartTimeMillis = System.currentTimeMillis() docLevelMonitorInput.indices.forEach { indexName -> var concreteIndices = IndexUtils.resolveAllIndices( @@ -284,6 +284,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries ) } + val took = System.currentTimeMillis() - queryingStartTimeMillis + logger.error("PERF_DEBUG: Entire query+percolate completed in $took millis in $executionId") monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -303,6 +305,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { + logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor $monitor.id") docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) @@ -338,7 +341,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } else { onSuccessfulMonitorRun(monitorCtx, monitor) } - + logger.error( + "Calling upsertMetadata function from ${monitorCtx.clusterService!!.localNode().id} in " + + "execution $executionId" + ) MonitorMetadataService.upsertMetadata( monitorMetadata.copy(lastRunContext = updatedLastRunContext), true @@ -533,7 +539,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { publishFinding(monitor, monitorCtx, finding) } catch (e: Exception) { // suppress exception - logger.error("Optional finding callback failed", e) } return finding.id } @@ -657,27 +662,58 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { + continue + } + var from = prevSeqNo ?: 0 + var numDocsLeftToQueryFromShard = maxSeqNo - from - val hits: SearchHits = searchShard( - monitorCtx, - concreteIndexName, - shard, - prevSeqNo, - maxSeqNo, - null, - docIds, - fieldsToBeQueried - ) - transformedDocs.addAll( - transformSearchHitsAndReconstructDocs( - hits, - indexName, + while (numDocsLeftToQueryFromShard > 0) { + val to = from + if (numDocsLeftToQueryFromShard < 10000) numDocsLeftToQueryFromShard else 10000 + val hits: SearchHits = searchShard( + monitorCtx, concreteIndexName, - monitor.id, - conflictingFields, - docsSizeInBytes + shard, + from, + to, + null, + docIds, + fieldsToBeQueried ) - ) + from = to + 1 + numDocsLeftToQueryFromShard -= 10000 + val startTime = Instant.now() + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexName, + concreteIndexName, + monitor.id, + conflictingFields, + docsSizeInBytes + ) + ) + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + ) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries + ) + } + logger.error( + "PERF_DEBUG: Transforming docs of shard [$indexName][$shard] " + + "took ${Instant.now().epochSecond - startTime.epochSecond}" + ) + } } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + @@ -685,22 +721,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) } - if ( - transformedDocs.isNotEmpty() && - shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) - ) { - performPercolateQueryAndResetCounters( - monitorCtx, - transformedDocs, - docsSizeInBytes, - monitor, - monitorMetadata, - monitorInputIndices, - concreteIndices, - inputRunResults, - docsToQueries - ) - } } } @@ -751,7 +771,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } - /** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo. + /** Executes search query on given shard of given index to fetch docs with sequence number greater than prevSeqNo. * This method hence fetches only docs from shard which haven't been queried before */ private suspend fun searchShard( @@ -787,7 +807,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .query(boolQueryBuilder) .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k ) - .preference(Preference.PRIMARY_FIRST.type()) if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) { logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}") @@ -833,7 +852,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } val searchRequest = - SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type()) + SearchRequest().indices(*queryIndices.toTypedArray()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) @@ -863,6 +882,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query hits = ${response.hits}") return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 5c8886686..9bd923e22 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -108,6 +108,7 @@ object MonitorMetadataService : primaryTerm = response.primaryTerm ) } catch (e: Exception) { + log.error("Failed to upsert metadata", e) throw AlertingException.wrap(e) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index a884199f8..b8719b4b0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -257,11 +257,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon when (job) { is Workflow -> { launch { + logger.debug( + "PERF_DEBUG: executing workflow ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) runJob(job, periodStart, periodEnd, false) } } is Monitor -> { launch { + logger.debug( + "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) runJob(job, periodStart, periodEnd, false) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 42237853f..21e4d7077 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -395,7 +395,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } bulkResponse.forEach { bulkItemResponse -> if (bulkItemResponse.isFailed) { - log.debug(bulkItemResponse.failureMessage) + log.error("Failed to index doc level query for monitor $monitorId due to" + bulkItemResponse.failureMessage) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index cfed18c89..f0253858b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -60,7 +60,10 @@ object CompositeWorkflowRunner : WorkflowRunner() { val isTempWorkflow = dryRun || workflow.id == Workflow.NO_ID val executionId = generateExecutionId(isTempWorkflow, workflow) - + logger.debug( + "Workflow ${workflow.id} execution began at $workflowExecutionStartTime" + + " on node ${monitorCtx.clusterService!!.localNode().id}" + ) val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata( workflow = workflow, skipIndex = isTempWorkflow, @@ -227,6 +230,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { ) } workflowRunResult.executionEndTime = Instant.now() + logger.debug("Workflow ${workflow.id} execution completed at $workflowRunResult.executionEndTime") return workflowRunResult } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt index b67f278b2..f5a267388 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt @@ -462,11 +462,13 @@ class JobSweeper( private fun isOwningNode(shardId: ShardId, jobId: JobId): Boolean { val localNodeId = clusterService.localNode().id - val shardNodeIds = clusterService.state().routingTable.shardRoutingTable(shardId) + val shardRoutingTable = clusterService.state().routingTable.shardRoutingTable(shardId) + val shardNodeIds = shardRoutingTable .filter { it.active() } .map { it.currentNodeId() } val shardNodes = ShardNodes(localNodeId, shardNodeIds) - return shardNodes.isOwningNode(jobId) + val owningNode = shardNodes.isOwningNode(jobId) + return owningNode } } From d324f8deda5f0ff488e74f5053c07bd878d83b21 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 2 Feb 2024 01:09:07 -0800 Subject: [PATCH 13/16] add stats Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 184 +++++++++++------- .../transport/TransportIndexMonitorAction.kt | 12 ++ 2 files changed, 126 insertions(+), 70 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 87b2fb985..979a156cd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -8,8 +8,10 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -86,7 +88,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) - + var nonPercolateSearchesTimeTaken = AtomicLong(0) + var percolateQueriesTimeTaken = AtomicLong(0) + var totalDocsQueried = AtomicLong(0) try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) @@ -208,7 +212,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val indexUpdatedRunContext = updateLastRunContext( indexLastRunContext.toMutableMap(), monitorCtx, - concreteIndexName + concreteIndexName, + nonPercolateSearchesTimeTaken ) as MutableMap if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) @@ -265,7 +270,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsSizeInBytes, updatedIndexNames, concreteIndicesSeenSoFar, - ArrayList(fieldsToBeQueried) + ArrayList(fieldsToBeQueried), + nonPercolateSearchesTimeTaken, + percolateQueriesTimeTaken, + totalDocsQueried ) } } @@ -281,7 +289,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { updatedIndexNames, concreteIndicesSeenSoFar, inputRunResults, - docsToQueries + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried ) } val took = System.currentTimeMillis() - queryingStartTimeMillis @@ -305,11 +315,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { - logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor $monitor.id") - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) - } + logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) } } else { monitor.triggers.forEach { @@ -363,6 +370,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + } finally { + logger.error( + "PERF_DEBUG_STATS: Monitor ${monitor.id} " + + "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken" + ) + logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on percolate queries in millis: $percolateQueriesTimeTaken") + logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Num docs queried: $totalDocsQueried") } } @@ -401,7 +415,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { trigger: DocumentLevelTrigger, monitor: Monitor, idQueryMap: Map, - docsToQueries: Map>, + docsToQueries: MutableMap>, queryToDocIds: Map>, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, @@ -410,35 +424,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) - val findings = mutableListOf() - val findingDocPairs = mutableListOf>() + val triggerFindingDocPairs = mutableListOf>() // TODO: Implement throttling for findings - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings( - monitor, - monitorCtx, - triggeredQueries, - it.key, - !dryrun && monitor.id != Monitor.NO_ID, - executionId - ) - findings.add(findingId) + val findingToDocPairs = createFindings( + monitor, + monitorCtx, + docsToQueries, + idQueryMap, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) - if (triggerResult.triggeredDocs.contains(it.key)) { - findingDocPairs.add(Pair(findingId, it.key)) + findingToDocPairs.forEach { + // Only pick those entries whose docs have triggers associated with them + if (triggerResult.triggeredDocs.contains(it.second)) { + triggerFindingDocPairs.add(Pair(it.first, it.second)) } } val actionCtx = triggerCtx.copy( triggeredDocs = triggerResult.triggeredDocs, - relatedFindings = findings, + relatedFindings = findingToDocPairs.map { it.first }, error = monitorResult.error ?: triggerResult.error ) val alerts = mutableListOf() - findingDocPairs.forEach { + triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), listOf(it.second), @@ -500,47 +512,65 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docLevelQueries: List, - matchingDocId: String, + docsToQueries: MutableMap>, + idQueryMap: Map, shouldCreateFinding: Boolean, workflowExecutionId: String? = null, - ): String { - // Before the "|" is the doc id and after the "|" is the index - val docIndex = matchingDocId.split("|") + ): List> { - val finding = Finding( - id = UUID.randomUUID().toString(), - relatedDocIds = listOf(docIndex[0]), - correlatedDocIds = listOf(docIndex[0]), - monitorId = monitor.id, - monitorName = monitor.name, - index = docIndex[1], - docLevelQueries = docLevelQueries, - timestamp = Instant.now(), - executionId = workflowExecutionId - ) + val findingDocPairs = mutableListOf>() + val findings = mutableListOf() + val indexRequests = mutableListOf() - val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - logger.debug("Findings: $findingStr") + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - if (shouldCreateFinding) { - val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) + // Before the "|" is the doc id and after the "|" is the index + val docIndex = it.key.split("|") - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.index(indexRequest, it) + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = listOf(docIndex[0]), + correlatedDocIds = listOf(docIndex[0]), + monitorId = monitor.id, + monitorName = monitor.name, + index = docIndex[1], + docLevelQueries = triggeredQueries, + timestamp = Instant.now(), + executionId = workflowExecutionId + ) + findingDocPairs.add(Pair(finding.id, it.key)) + findings.add(finding) + + val findingStr = + finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) + .string() + logger.debug("Findings: $findingStr") + + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) + .opType(DocWriteRequest.OpType.CREATE) } } - try { - publishFinding(monitor, monitorCtx, finding) - } catch (e: Exception) { - // suppress exception + if (indexRequests.isNotEmpty()) { + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { + bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + } + } + } else { + logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") + } } - return finding.id + return findingDocPairs } private fun publishFinding( @@ -563,13 +593,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun updateLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, - index: String + index: String, + nonPercolateSearchesTimeTaken: AtomicLong ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) + val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken) updatedLastRunContext[shard] = maxSeqNo.toString() } return updatedLastRunContext @@ -606,7 +637,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * Get the current max seq number of the shard. We find it by searching the last document * in the primary shard. */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { + private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long { val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -625,7 +656,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } if (response.hits.hits.isEmpty()) return -1L - + nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) return response.hits.hits[0].seqNo } @@ -654,7 +685,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsSizeInBytes: AtomicLong, monitorInputIndices: List, concreteIndices: List, - fieldsToBeQueried: List + fieldsToBeQueried: List, + nonPercolateSearchesTimeTaken: AtomicLong, + percolateQueriesTimeTaken: AtomicLong, + totalDocsQueried: AtomicLong ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -678,7 +712,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { to, null, docIds, - fieldsToBeQueried + fieldsToBeQueried, + nonPercolateSearchesTimeTaken ) from = to + 1 numDocsLeftToQueryFromShard -= 10000 @@ -706,7 +741,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorInputIndices, concreteIndices, inputRunResults, - docsToQueries + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried ) } logger.error( @@ -743,6 +780,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices: List, inputRunResults: MutableMap>, docsToQueries: MutableMap>, + percolateQueriesTimeTaken: AtomicLong, + totalDocsQueried: AtomicLong ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( @@ -751,7 +790,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor, monitorMetadata, concreteIndices, - monitorInputIndices + monitorInputIndices, + percolateQueriesTimeTaken ) percolateQueryResponseHits.forEach { hit -> @@ -765,6 +805,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } } + totalDocsQueried.getAndAdd(transformedDocs.size.toLong()) } finally { // no catch block because exception is caught and handled in runMonitor() class transformedDocs.clear() docsSizeInBytes.set(0) @@ -783,6 +824,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { query: String?, docIds: List? = null, fieldsToFetch: List, + nonPercolateSearchesTimeTaken: AtomicLong, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -819,6 +861,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } + nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) return response.hits } @@ -830,6 +873,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, concreteIndices: List, monitorInputIndices: List, + percolateQueriesTimeTaken: AtomicLong ): SearchHits { val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) @@ -881,8 +925,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { "Response status is ${response.status()}" ) } - logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}") - logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query hits = ${response.hits}") + logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query response = $response") + percolateQueriesTimeTaken.getAndAdd(response.took.millis) return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ @@ -907,7 +952,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val sourceMap = if (hit.hasSource()) { hit.sourceAsMap } else { - logger.error("PERF_DEBUG:Building percolate query source docs from relevant fields only") constructSourceMapFromFieldsInHit(hit) } transformDocumentFieldNames( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 49743b3f0..12e0c481b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -189,6 +189,18 @@ class TransportIndexMonitorAction @Inject constructor( else (it as DocLevelMonitorInput).indices indices.addAll(inputIndices) } + if ( + indices.size == 1 && ( + IndexUtils.isAlias(indices[0], clusterService.state()) || + IndexUtils.isDataStream(indices[0], clusterService.state()) + ) + ) { + val metadata = clusterService.state().metadata.indicesLookup[indices[0]]?.writeIndex + if (metadata != null) { + indices.removeAt(0) + indices.add(metadata.index.name) + } + } val searchRequest = SearchRequest().indices(*indices.toTypedArray()) .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) client.search( From da6afbca0a466c7d656c1c6b63cd067eed42dc22 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 8 Feb 2024 15:51:08 -0800 Subject: [PATCH 14/16] optimize on compute max seq nos and remove all redundant search requests Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 118 +++++++++--------- .../model/DocumentExecutionContext.kt | 14 --- .../alerting/model/IndexExecutionContext.kt | 18 +++ 3 files changed, 78 insertions(+), 72 deletions(-) delete mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 979a156cd..daeb22945 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -16,8 +16,8 @@ import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest -import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult @@ -60,7 +60,8 @@ import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders -import org.opensearch.monitor.jvm.JvmStats +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indices.IndexClosedException import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits @@ -91,6 +92,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var nonPercolateSearchesTimeTaken = AtomicLong(0) var percolateQueriesTimeTaken = AtomicLong(0) var totalDocsQueried = AtomicLong(0) + var docTransformTimeTaken = AtomicLong(0) try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) @@ -209,11 +211,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( + val indexUpdatedRunContext = initializeNewLastRunContext( indexLastRunContext.toMutableMap(), monitorCtx, - concreteIndexName, - nonPercolateSearchesTimeTaken + concreteIndexName ) as MutableMap if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) @@ -251,18 +252,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } fieldsToBeQueried.addAll(it.queryFieldNames) } - - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - - fetchShardDataAndMaybeExecutePercolateQueries( - monitor, - monitorCtx, - docExecutionContext, + logger.error("PERF_DEBUG: Monitor ${monitor.id} Query field names: ${fieldsToBeQueried.joinToString()}}") + val indexExecutionContext = IndexExecutionContext( + queries, + indexLastRunContext, + indexUpdatedRunContext, updatedIndexName, concreteIndexName, conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), + ) + + fetchShardDataAndMaybeExecutePercolateQueries( + monitor, + monitorCtx, + indexExecutionContext, monitorMetadata, inputRunResults, docsToQueries, @@ -273,8 +277,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ArrayList(fieldsToBeQueried), nonPercolateSearchesTimeTaken, percolateQueriesTimeTaken, - totalDocsQueried - ) + totalDocsQueried, + docTransformTimeTaken + ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number + indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo + } } } /* if all indices are covered still in-memory docs size limit is not breached we would need to submit @@ -295,7 +302,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } val took = System.currentTimeMillis() - queryingStartTimeMillis - logger.error("PERF_DEBUG: Entire query+percolate completed in $took millis in $executionId") + logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in $executionId") monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -376,6 +383,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken" ) logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on percolate queries in millis: $percolateQueriesTimeTaken") + logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on transforming doc fields in millis: $docTransformTimeTaken") logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Num docs queried: $totalDocsQueried") } } @@ -590,18 +598,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } - private suspend fun updateLastRunContext( + private fun initializeNewLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, index: String, - nonPercolateSearchesTimeTaken: AtomicLong ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken) - updatedLastRunContext[shard] = maxSeqNo.toString() + updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString() } return updatedLastRunContext } @@ -650,7 +656,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .size(1) ) val response: SearchResponse = client.suspendUntil { client.search(request, it) } - JvmStats.jvmStats() if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } @@ -673,11 +678,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docExecutionCtx: DocumentExecutionContext, - indexName: String, - concreteIndexName: String, - conflictingFields: List, - docIds: List? = null, + indexExecutionCtx: IndexExecutionContext, monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, @@ -688,43 +689,47 @@ object DocumentLevelMonitorRunner : MonitorRunner() { fieldsToBeQueried: List, nonPercolateSearchesTimeTaken: AtomicLong, percolateQueriesTimeTaken: AtomicLong, - totalDocsQueried: AtomicLong + totalDocsQueried: AtomicLong, + docTransformTimeTake: AtomicLong, + updateLastRunContext: (String, String) -> Unit ) { - val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int + val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { val shard = i.toString() try { - val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() - val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() - if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { - continue - } - var from = prevSeqNo ?: 0 - var numDocsLeftToQueryFromShard = maxSeqNo - from - - while (numDocsLeftToQueryFromShard > 0) { - val to = from + if (numDocsLeftToQueryFromShard < 10000) numDocsLeftToQueryFromShard else 10000 + val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED + var to: Long = Long.MAX_VALUE + while (to >= from) { val hits: SearchHits = searchShard( monitorCtx, - concreteIndexName, + indexExecutionCtx.concreteIndexName, shard, from, to, - null, - docIds, + indexExecutionCtx.docIds, fieldsToBeQueried, nonPercolateSearchesTimeTaken ) - from = to + 1 - numDocsLeftToQueryFromShard -= 10000 - val startTime = Instant.now() + if (hits.hits.isEmpty()) { + updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) + break + } + if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed + + updateLastRunContext(shard, hits.hits[0].seqNo.toString()) + to = hits.hits[0].seqNo - 10000L + } else { + to -= 10000L + } + val startTime = System.currentTimeMillis() transformedDocs.addAll( transformSearchHitsAndReconstructDocs( hits, - indexName, - concreteIndexName, + indexExecutionCtx.indexName, + indexExecutionCtx.concreteIndexName, monitor.id, - conflictingFields, + indexExecutionCtx.conflictingFields, docsSizeInBytes ) ) @@ -746,17 +751,18 @@ object DocumentLevelMonitorRunner : MonitorRunner() { totalDocsQueried ) } - logger.error( - "PERF_DEBUG: Transforming docs of shard [$indexName][$shard] " + - "took ${Instant.now().epochSecond - startTime.epochSecond}" - ) + docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime) } } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + - " Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}", + "Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " + + "Error: ${e.message}", e ) + if (e is IndexClosedException) { + throw e + } } } } @@ -821,7 +827,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String?, docIds: List? = null, fieldsToFetch: List, nonPercolateSearchesTimeTaken: AtomicLong, @@ -832,10 +837,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (query != null) { - boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) - } - if (!docIds.isNullOrEmpty()) { boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } @@ -846,12 +847,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .source( SearchSourceBuilder() .version(true) + .sort("_seq_no", SortOrder.DESC) + .seqNoAndPrimaryTerm(true) .query(boolQueryBuilder) .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k ) if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) { - logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}") request.source().fetchSource(false) for (field in fieldsToFetch) { request.source().fetchField(field) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt deleted file mode 100644 index 0caad1f4a..000000000 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.model - -import org.opensearch.commons.alerting.model.DocLevelQuery - -data class DocumentExecutionContext( - val queries: List, - val lastRunContext: Map, - val updatedLastRunContext: Map -) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt new file mode 100644 index 000000000..97156eb96 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.model.DocLevelQuery + +data class IndexExecutionContext( + val queries: List, + val lastRunContext: MutableMap, + val updatedLastRunContext: MutableMap, + val indexName: String, + val concreteIndexName: String, + val conflictingFields: List, + val docIds: List? = null +) From d7580b391dece5886fe29f8ed27e0453423a394e Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 8 Feb 2024 15:55:43 -0800 Subject: [PATCH 15/16] todo Signed-off-by: Surya Sashank Nistala --- .../kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index e960b9da5..9268f5616 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -52,6 +52,7 @@ import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant import java.util.UUID +//TODO raise PR for bucket level monitor optimization also. dont miss object BucketLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) From ad57b5671ecbf6672ca3770c60bb00fb65b7d95d Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 16 Feb 2024 16:21:36 -0800 Subject: [PATCH 16/16] bucket level monitor: opitmize to read only from write index when alias is configured Signed-off-by: Surya Sashank Nistala --- .../alerting/BucketLevelMonitorRunner.kt | 2 +- .../org/opensearch/alerting/InputService.kt | 31 ++++++++++++++++--- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 9268f5616..bf7f5953c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -52,7 +52,7 @@ import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant import java.util.UUID -//TODO raise PR for bucket level monitor optimization also. dont miss +// TODO raise PR for bucket level monitor optimization also. dont miss object BucketLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index aa385d989..f7e011a99 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.model.TriggerAfterKey import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.util.AggregationQueryRewriter +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap @@ -20,7 +21,6 @@ import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.util.use import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client -import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.settings.Settings @@ -102,8 +102,7 @@ class InputService( .execute() val searchRequest = SearchRequest() - .indices(*input.indices.toTypedArray()) - .preference(Preference.PRIMARY_FIRST.type()) + .indices(*resolveIndices(monitor, input.indices).toTypedArray()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) } @@ -197,7 +196,6 @@ class InputService( val searchRequest = SearchRequest() .indices(*input.indices.toTypedArray()) - .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) } @@ -224,4 +222,29 @@ class InputService( InputRunResults(emptyList(), e) } } + + /** accepts list of index names and returns back a list where : + * i. if it's a concrete index or an index pattern - no changes is made + * ii. if it's a data stream or an alias - we return the active write index + */ + private fun resolveIndices( + monitor: Monitor, + indices: List, + ): List { + val resolvedIndices = mutableListOf() + for (it in indices) { + if (IndexUtils.isAlias(it, clusterService.state()) || IndexUtils.isDataStream(it, clusterService.state())) { + val writeIndex = IndexUtils.getWriteIndex(it, clusterService.state()) + if (writeIndex == null) { + logger.error("Monitor $monitor.id: Write Index not found for $it") + continue + } + resolvedIndices.add(writeIndex) + // TODO add edge case where periodStart < writeIndex's creationTime + } else { + resolvedIndices.add(it) + } + } + return resolvedIndices + } }