diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 091549f85..557c6db88 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -370,6 +370,7 @@ class InputService( val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u } ?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString") val duration = when (unit) { + "s" -> Duration.ofSeconds(amount.toLong()) "m" -> Duration.ofMinutes(amount.toLong()) "h" -> Duration.ofHours(amount.toLong()) "d" -> Duration.ofDays(amount.toLong()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 486ea344e..665eb3e3c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1257,6 +1257,22 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } } + protected fun insertSampleTimeSerializedDataCurrentTime(index: String, data: List) { + data.forEachIndexed { i, value -> + val time = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field": "$value", + "number": "$i" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + protected fun deleteDataWithDocIds(index: String, docIds: List) { docIds.forEach { deleteDoc(index, it) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 4140931d7..98c41e499 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.model.destination.email.Email import org.opensearch.alerting.model.destination.email.Recipient import org.opensearch.alerting.util.DestinationType import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.client.Request import org.opensearch.client.ResponseException import org.opensearch.client.WarningFailureException import org.opensearch.common.settings.Settings @@ -1190,7 +1191,16 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } - fun `test execute bucket-level monitor with alias`() { + fun `test execute bucket-level monitor with alias optimization - indices not skipped`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedDataCurrentTime( + testIndex, + listOf( + "test_value_3", + "test_value_4", // adding duplicate to verify aggregation + "test_value_5" + ) + ) val indexMapping = """ "properties" : { "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, @@ -1200,7 +1210,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { """.trimIndent() val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping) val aliasName = alias.keys.first() - insertSampleTimeSerializedData( + insertSampleTimeSerializedDataCurrentTime( aliasName, listOf( "test_value_1", @@ -1208,9 +1218,73 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { "test_value_2" ) ) + addIndexToAlias(testIndex, aliasName) + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10s") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(aliasName), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> + assertEquals("Incorrect search result", 5, buckets.size) + } + + fun `test execute bucket-level monitor with alias optimization - indices skipped from query`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedDataCurrentTime( + testIndex, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + Thread.sleep(10000) + val indexMapping = """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping) + val aliasName = alias.keys.first() + insertSampleTimeSerializedDataCurrentTime( + aliasName, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + addIndexToAlias(testIndex, aliasName) val query = QueryBuilders.rangeQuery("test_strict_date_time") - .gt("{{period_end}}||-10d") + .gt("{{period_end}}||-10s") .lte("{{period_end}}") .format("epoch_millis") val compositeSources = listOf( @@ -1241,7 +1315,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> - assertEquals("Incorrect search result", 2, buckets.size) + Assert.assertTrue(buckets.size <= 2) } fun `test execute bucket-level monitor returns search result with multi term agg`() { @@ -2240,4 +2314,21 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { private fun Map.objectMap(key: String): Map> { return this[key] as Map> } + + fun addIndexToAlias(index: String, alias: String) { + val request = Request("POST", "/_aliases") + request.setJsonEntity( + """{"actions": [{"add": {"index": "$index","alias": "$alias"}} ]}""".trimIndent() + ) + + try { + val response = client().performRequest(request) + if (response.statusLine.statusCode != RestStatus.OK.status) { + throw ResponseException(response) + } + } catch (e: Exception) { + // Handle the exception appropriately, e.g., log it or rethrow + throw RuntimeException("Failed to add index to alias", e) + } + } }