diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 9df20dbc5..e975df333 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -46,6 +46,7 @@ import org.opensearch.alerting.resthandler.RestSearchMonitorAction import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings @@ -323,6 +324,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, AlertingSettings.ALERTING_MAX_MONITORS, AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY, AlertingSettings.REQUEST_TIMEOUT, AlertingSettings.MAX_ACTION_THROTTLE_VALUE, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 39a837337..ff939418a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -701,16 +701,16 @@ class DocumentLevelMonitorRunner : MonitorRunner() { fieldsToBeQueried, ) if (hits.hits.isEmpty()) { - updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) + if (to == Long.MAX_VALUE) { + updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) // didn't find any docs + } break } if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed - updateLastRunContext(shard, hits.hits[0].seqNo.toString()) - to = hits.hits[0].seqNo - 10000L - } else { - to -= 10000L } + val leastSeqNoFromHits = hits.hits.last().seqNo + to = leastSeqNoFromHits - 1 val startTime = System.currentTimeMillis() transformedDocs.addAll( transformSearchHitsAndReconstructDocs( @@ -841,7 +841,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { .sort("_seq_no", SortOrder.DESC) .seqNoAndPrimaryTerm(true) .query(boolQueryBuilder) - .size(10000) + .size(monitorCtx.docLevelMonitorShardFetchSize) ) .preference(Preference.PRIMARY_FIRST.type()) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 043ae88d4..424656c6b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -55,4 +55,6 @@ data class MonitorRunnerExecutionContext( @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, @Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + @Volatile var docLevelMonitorShardFetchSize: Int = + AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 9cd3c2401..a8f0a5f41 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -26,6 +26,7 @@ import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT @@ -202,6 +203,13 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it } + monitorCtx.docLevelMonitorShardFetchSize = + DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings + .addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE) { + monitorCtx.docLevelMonitorShardFetchSize = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 19059dc1d..5039ec329 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -20,6 +20,7 @@ class AlertingSettings { const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000 const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10 + const val DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = 10000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -38,6 +39,16 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** Purely a setting used to verify seq_no calculation + */ + val DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = Setting.intSetting( + "plugins.alerting.monitor.doc_level_monitor_shard_fetch_size", + DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, + 1, + 10000, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + /** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the * monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 0c5eaf1c4..ed0668daf 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -137,6 +137,49 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Alert saved for test monitor", 0, alerts.size) } + fun `test seq_no calculation correctness when docs are deleted`() { + adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.key, 2) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = + DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + indexDoc(index, "2", testDoc) + indexDoc(index, "3", testDoc) + indexDoc(index, "4", testDoc) + indexDoc(index, "5", testDoc) + indexDoc(index, "11", testDoc) + indexDoc(index, "21", testDoc) + indexDoc(index, "31", testDoc) + indexDoc(index, "41", testDoc) + indexDoc(index, "51", testDoc) + + deleteDoc(index, "51") + val response = executeMonitor(monitor, params = mapOf("dryrun" to "false")) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(9, triggerResult.objectMap("action_results").values.size) + } + } + fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() { val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) @@ -163,6 +206,10 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) indexDoc(index, "1", testDoc) + indexDoc(index, "2", testDoc) + indexDoc(index, "3", testDoc) + indexDoc(index, "4", testDoc) + indexDoc(index, "5", testDoc) val response = executeMonitor(monitor, params = DRYRUN_MONITOR)