diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index ab90d29ad..39a359639 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -23,6 +23,7 @@ import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -663,7 +664,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) } - if (transformedDocs.isNotEmpty() && isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) { + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + ) { performPercolateQueryAndResetCounters( monitorCtx, transformedDocs, @@ -679,6 +683,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } + private fun shouldPerformPercolateQueryAndFlushInMemoryDocs( + docsSizeInBytes: AtomicLong, + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) || + isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) + } + private suspend fun performPercolateQueryAndResetCounters( monitorCtx: MonitorRunnerExecutionContext, transformedDocs: MutableList>, @@ -918,15 +931,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() { */ private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) - if (thresholdPercentage > 100 || thresholdPercentage < 0) { - thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.getDefault(monitorCtx.settings) - } val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes return docsBytesSize > thresholdBytes } + private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + return numDocs >= maxNumDocsThreshold + } + /** * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name * and doc source. A list of these POJOs would be passed to percolate query execution logic. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 54d5d5b82..c7d571824 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -31,6 +31,20 @@ class AlertingSettings { val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting( "plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit", 10, + 0, + 100, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document + * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the + * monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate + * query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current + * execution + */ + val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_max_num_docs_in_memory", + 10000, 1000, Setting.Property.NodeScope, Setting.Property.Dynamic )