From 92549fcee30c1aca75ce42ff1f58a1211ef7d194 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 17 Oct 2024 18:40:43 -0700 Subject: [PATCH 1/4] optimize bucket level monitor to resolve alias to query only those time-series indices that contain docs within timeframe of range query filter in search input Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertingPlugin.kt | 12 +- .../org/opensearch/alerting/InputService.kt | 121 +++++++++++++++++- .../alerting/util/CrossClusterMonitorUtils.kt | 5 + .../alerting/AlertingRestTestCase.kt | 18 ++- .../alerting/MonitorRunnerServiceIT.kt | 54 ++++++++ 5 files changed, 205 insertions(+), 5 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 5256eff51..1f8a4d514 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -295,7 +295,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerSettings(settings) .registerThreadPool(threadPool) .registerAlertIndices(alertIndices) - .registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings)) + .registerInputService( + InputService( + client, + scriptService, + namedWriteableRegistry, + xContentRegistry, + clusterService, + settings, + indexNameExpressionResolver + ) + ) .registerTriggerService(triggerService) .registerAlertService(alertService) .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index c3cf90251..091549f85 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -15,12 +15,14 @@ import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.CrossClusterMonitorUtils +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.util.use import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput @@ -40,12 +42,14 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.query.RangeQueryBuilder import org.opensearch.index.query.TermsQueryBuilder import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.ScriptType import org.opensearch.script.TemplateScript import org.opensearch.search.builder.SearchSourceBuilder +import java.time.Duration import java.time.Instant /** Service that handles the collection of input results for Monitor executions */ @@ -55,7 +59,8 @@ class InputService( val namedWriteableRegistry: NamedWriteableRegistry, val xContentRegistry: NamedXContentRegistry, val clusterService: ClusterService, - val settings: Settings + val settings: Settings, + val indexNameExpressionResolver: IndexNameExpressionResolver ) { private val logger = LogManager.getLogger(InputService::class.java) @@ -245,8 +250,9 @@ class InputService( .execute() val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService) + val resolvedIndexes = resolveOnlyQueryableIndicesFromLocalClusterAliases(monitor, periodEnd, searchInput.query.query(), indexes) val searchRequest = SearchRequest() - .indices(*indexes.toTypedArray()) + .indices(*resolvedIndexes.toTypedArray()) .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { @@ -256,6 +262,69 @@ class InputService( return searchRequest } + /** + * Resolves concrete indices from aliases based on a time range query and availability in the local cluster. + * + *

If an index passed to OpenSearch is an alias, this method will only select those indices + * resolved from the alias that meet the following criteria: + * + *

    + *
  1. The index's creation date falls within the time range specified in the query's timestamp field.
  2. + *
  3. The index immediately preceding the time range in terms of creation date is also included.
  4. + *
+ * + *

This ensures that queries targeting aliases consider relevant indices based on their creation time, + * including the one immediately before the specified range to account for potential data at the boundary. + */ + private fun resolveOnlyQueryableIndicesFromLocalClusterAliases( + monitor: Monitor, + periodEnd: Instant, + query: QueryBuilder, + indexes: List, + ): List { + val resolvedIndexes = ArrayList() + indexes.forEach { + // we don't optimize for remote cluster aliases. we directly pass them to search request + if (CrossClusterMonitorUtils.isRemoteClusterIndex(it, clusterService)) + resolvedIndexes.add(it) + else { + val state = clusterService.state() + if (IndexUtils.isAlias(it, state)) { + val resolveStartTimeOfQueryTimeRange = resolveStartTimeofQueryTimeRange(monitor, query, periodEnd) + if (resolveStartTimeOfQueryTimeRange != null) { + val indices = IndexUtils.resolveAllIndices(listOf(it), clusterService, indexNameExpressionResolver) + val sortedIndices = indices + .mapNotNull { state.metadata().index(it) } // Get IndexMetadata for each index + .sortedBy { it.creationDate } // Sort by creation date + + var includePrevious = true + for (i in sortedIndices.indices) { + val indexMetadata = sortedIndices[i] + val creationDate = indexMetadata.creationDate + + if (creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()) { + resolvedIndexes.add(indexMetadata.index.name) + includePrevious = false // No need to include previous anymore + } else if ( + includePrevious && i > 0 && sortedIndices[i - 1].creationDate < + resolveStartTimeOfQueryTimeRange.toEpochMilli() + ) { + // Include the index immediately before the timestamp + resolvedIndexes.add(indexMetadata.index.name) + includePrevious = false + } + } + } else { + resolvedIndexes.add(it) + } + } else { + resolvedIndexes.add(it) + } + } + } + return resolvedIndexes + } + private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList> { logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) @@ -289,4 +358,52 @@ class InputService( } return results } + + fun resolveStartTimeofQueryTimeRange(monitor: Monitor, query: QueryBuilder, periodEnd: Instant): Instant? { + try { + val rangeQuery = findRangeQuery(query) ?: return null + val searchParameter = rangeQuery.from().toString() // we are looking for 'timeframe' variable {{period_end}}||- + + val timeframeString = searchParameter.substringAfter("||-") + val timeframeRegex = Regex("(\\d+)([a-zA-Z]+)") + val matchResult = timeframeRegex.find(timeframeString) + val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u } + ?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString") + val duration = when (unit) { + "m" -> Duration.ofMinutes(amount.toLong()) + "h" -> Duration.ofHours(amount.toLong()) + "d" -> Duration.ofDays(amount.toLong()) + else -> throw IllegalArgumentException("Invalid time unit: $unit") + } + + return periodEnd.minus(duration) + } catch (e: Exception) { + logger.error( + "Monitor ${monitor.id}:" + + " Failed to resolve time frame of search query while optimizing to query only on few of alias' concrete indices", + e + ) + return null // won't do optimization as we failed to resolve the timeframe due to unexpected error + } + } + + private fun findRangeQuery(queryBuilder: QueryBuilder?): RangeQueryBuilder? { + if (queryBuilder == null) return null + if (queryBuilder is RangeQueryBuilder) return queryBuilder + + if (queryBuilder is BoolQueryBuilder) { + for (clause in queryBuilder.must()) { + val rangeQuery = findRangeQuery(clause) + if (rangeQuery != null) return rangeQuery + } + for (clause in queryBuilder.should()) { + val rangeQuery = findRangeQuery(clause) + if (rangeQuery != null) return rangeQuery + } + // You can also check queryBuilder.filter() and queryBuilder.mustNot() if needed + } + + // Add handling for other query types if necessary (e.g., NestedQueryBuilder, etc.) + return null + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt index 6ec14ffa2..b9ddaf187 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt @@ -227,5 +227,10 @@ class CrossClusterMonitorUtils { return if (clusterName.isNotEmpty()) "$clusterName:$indexName" else indexName } + + fun isRemoteClusterIndex(index: String, clusterService: ClusterService): Boolean { + val clusterName = parseClusterName(index) + return clusterName.isNotEmpty() && clusterService.clusterName.value() != clusterName + } } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 30ed82b5c..2aed4db3c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1000,6 +1000,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex)) } + protected fun createTestAlias( + alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT), + numOfAliasIndices: Int = randomIntBetween(1, 10), + includeWriteIndex: Boolean = true, + indicesMapping: String, + ): MutableMap> { + return createTestAlias( + alias = alias, + indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex), + indicesMapping = indicesMapping + ) + } + protected fun createTestAlias( alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT), indices: Map = randomAliasIndices( @@ -1007,12 +1020,13 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { num = randomIntBetween(1, 10), includeWriteIndex = true ), - createIndices: Boolean = true + createIndices: Boolean = true, + indicesMapping: String = "" ): MutableMap> { val indicesMap = mutableMapOf() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { - if (createIndices) createTestIndex(index = it, mapping = "") + if (createIndices) createTestIndex(index = it, indicesMapping) val isWriteIndex = indices.getOrDefault(it, false) indicesMap[it] = isWriteIndex val indexMap = mapOf( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index c59bac0eb..4140931d7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -1190,6 +1190,60 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } + fun `test execute bucket-level monitor with alias`() { + val indexMapping = """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping) + val aliasName = alias.keys.first() + insertSampleTimeSerializedData( + aliasName, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(aliasName), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> + assertEquals("Incorrect search result", 2, buckets.size) + } + fun `test execute bucket-level monitor returns search result with multi term agg`() { val index = "test_index_1234" indexDoc( From 8bcba11b1c92aec8eb9e9ed1c7ef358e425b05a0 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 18 Oct 2024 01:26:40 -0700 Subject: [PATCH 2/4] add tests to verify alias based optimziation scenarios Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/InputService.kt | 1 + .../alerting/AlertingRestTestCase.kt | 16 +++ .../alerting/MonitorRunnerServiceIT.kt | 99 ++++++++++++++++++- 3 files changed, 112 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 091549f85..557c6db88 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -370,6 +370,7 @@ class InputService( val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u } ?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString") val duration = when (unit) { + "s" -> Duration.ofSeconds(amount.toLong()) "m" -> Duration.ofMinutes(amount.toLong()) "h" -> Duration.ofHours(amount.toLong()) "d" -> Duration.ofDays(amount.toLong()) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 2aed4db3c..bea6531e5 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1225,6 +1225,22 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } } + protected fun insertSampleTimeSerializedDataCurrentTime(index: String, data: List) { + data.forEachIndexed { i, value -> + val time = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field": "$value", + "number": "$i" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + protected fun deleteDataWithDocIds(index: String, docIds: List) { docIds.forEach { deleteDoc(index, it) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 4140931d7..98c41e499 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.model.destination.email.Email import org.opensearch.alerting.model.destination.email.Recipient import org.opensearch.alerting.util.DestinationType import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.client.Request import org.opensearch.client.ResponseException import org.opensearch.client.WarningFailureException import org.opensearch.common.settings.Settings @@ -1190,7 +1191,16 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } - fun `test execute bucket-level monitor with alias`() { + fun `test execute bucket-level monitor with alias optimization - indices not skipped`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedDataCurrentTime( + testIndex, + listOf( + "test_value_3", + "test_value_4", // adding duplicate to verify aggregation + "test_value_5" + ) + ) val indexMapping = """ "properties" : { "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, @@ -1200,7 +1210,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { """.trimIndent() val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping) val aliasName = alias.keys.first() - insertSampleTimeSerializedData( + insertSampleTimeSerializedDataCurrentTime( aliasName, listOf( "test_value_1", @@ -1208,9 +1218,73 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { "test_value_2" ) ) + addIndexToAlias(testIndex, aliasName) + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10s") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(aliasName), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> + assertEquals("Incorrect search result", 5, buckets.size) + } + + fun `test execute bucket-level monitor with alias optimization - indices skipped from query`() { + val testIndex = createTestIndex() + insertSampleTimeSerializedDataCurrentTime( + testIndex, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + Thread.sleep(10000) + val indexMapping = """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping) + val aliasName = alias.keys.first() + insertSampleTimeSerializedDataCurrentTime( + aliasName, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + addIndexToAlias(testIndex, aliasName) val query = QueryBuilders.rangeQuery("test_strict_date_time") - .gt("{{period_end}}||-10d") + .gt("{{period_end}}||-10s") .lte("{{period_end}}") .format("epoch_millis") val compositeSources = listOf( @@ -1241,7 +1315,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> - assertEquals("Incorrect search result", 2, buckets.size) + Assert.assertTrue(buckets.size <= 2) } fun `test execute bucket-level monitor returns search result with multi term agg`() { @@ -2240,4 +2314,21 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { private fun Map.objectMap(key: String): Map> { return this[key] as Map> } + + fun addIndexToAlias(index: String, alias: String) { + val request = Request("POST", "/_aliases") + request.setJsonEntity( + """{"actions": [{"add": {"index": "$index","alias": "$alias"}} ]}""".trimIndent() + ) + + try { + val response = client().performRequest(request) + if (response.statusLine.statusCode != RestStatus.OK.status) { + throw ResponseException(response) + } + } catch (e: Exception) { + // Handle the exception appropriately, e.g., log it or rethrow + throw RuntimeException("Failed to add index to alias", e) + } + } } From 4578bc8391bc4a56a869766de2324c388d72cde6 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 21 Oct 2024 02:43:25 -0700 Subject: [PATCH 3/4] fix tests to verify aggregation query on alias optimziation with indices being skipped and not skipped Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/InputService.kt | 7 +- .../alerting/AlertingRestTestCase.kt | 19 ++++++ .../alerting/MonitorRunnerServiceIT.kt | 66 +++++++++++++------ 3 files changed, 69 insertions(+), 23 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 557c6db88..ccea577bc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -306,8 +306,10 @@ class InputService( resolvedIndexes.add(indexMetadata.index.name) includePrevious = false // No need to include previous anymore } else if ( - includePrevious && i > 0 && sortedIndices[i - 1].creationDate < - resolveStartTimeOfQueryTimeRange.toEpochMilli() + includePrevious && ( + i == sortedIndices.lastIndex || + sortedIndices[i + 1].creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli() + ) ) { // Include the index immediately before the timestamp resolvedIndexes.add(indexMetadata.index.name) @@ -315,6 +317,7 @@ class InputService( } } } else { + // add alias without optimizing for resolve indices resolvedIndexes.add(it) } } else { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index bea6531e5..4ae4cc923 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1241,6 +1241,25 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } } + protected fun insertSampleTimeSerializedDataWithTime( + index: String, + data: List, + time: ZonedDateTime? = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS), + ) { + data.forEachIndexed { i, value -> + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field": "$value", + "number": "$i" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + protected fun deleteDataWithDocIds(index: String, docIds: List) { docIds.forEach { deleteDoc(index, it) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 98c41e499..c017bf4a2 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -1191,16 +1191,10 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } - fun `test execute bucket-level monitor with alias optimization - indices not skipped`() { - val testIndex = createTestIndex() - insertSampleTimeSerializedDataCurrentTime( - testIndex, - listOf( - "test_value_3", - "test_value_4", // adding duplicate to verify aggregation - "test_value_5" - ) - ) + fun `test execute bucket-level monitor with alias optimization - indices not skipped from query`() { + val skipIndex = createTestIndex("to_skip_index") + val previousIndex = createTestIndex("to_include_index") + val indexMapping = """ "properties" : { "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, @@ -1218,7 +1212,24 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { "test_value_2" ) ) - addIndexToAlias(testIndex, aliasName) + insertSampleTimeSerializedDataWithTime( + previousIndex, + listOf( + "test_value_3", + "test_value_4", + "test_value_5" + ) + ) + insertSampleTimeSerializedDataWithTime( + skipIndex, + listOf( + "test_value_6", + "test_value_7", + "test_value_8" + ) + ) + addIndexToAlias(previousIndex, aliasName) + addIndexToAlias(skipIndex, aliasName) val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10s") .lte("{{period_end}}") @@ -1251,18 +1262,30 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> - assertEquals("Incorrect search result", 5, buckets.size) + Assert.assertEquals(buckets.size, 8) } fun `test execute bucket-level monitor with alias optimization - indices skipped from query`() { - val testIndex = createTestIndex() - insertSampleTimeSerializedDataCurrentTime( - testIndex, + val skipIndex = createTestIndex("to_skip_index") + Thread.sleep(10000) + val previousIndex = createTestIndex("to_include_index") + insertSampleTimeSerializedDataWithTime( + previousIndex, listOf( - "test_value_1", - "test_value_1", // adding duplicate to verify aggregation - "test_value_2" - ) + "test_value_3", + "test_value_4", + "test_value_5" + ), + ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10) + ) + insertSampleTimeSerializedDataWithTime( + skipIndex, + listOf( + "test_value_6", + "test_value_7", + "test_value_8" + ), + ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10) ) Thread.sleep(10000) val indexMapping = """ @@ -1282,7 +1305,8 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { "test_value_2" ) ) - addIndexToAlias(testIndex, aliasName) + addIndexToAlias(previousIndex, aliasName) + addIndexToAlias(skipIndex, aliasName) val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10s") .lte("{{period_end}}") @@ -1315,7 +1339,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> - Assert.assertTrue(buckets.size <= 2) + Assert.assertTrue(buckets.size <= 5) } fun `test execute bucket-level monitor returns search result with multi term agg`() { From b8b37f05b61e6de1971d1f99c1babe0204d386dd Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Mon, 21 Oct 2024 12:42:22 -0700 Subject: [PATCH 4/4] null check addded Signed-off-by: Surya Sashank Nistala --- .../kotlin/org/opensearch/alerting/InputService.kt | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index ccea577bc..c7eeac833 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -250,7 +250,17 @@ class InputService( .execute() val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService) - val resolvedIndexes = resolveOnlyQueryableIndicesFromLocalClusterAliases(monitor, periodEnd, searchInput.query.query(), indexes) + + val resolvedIndexes = if (searchInput.query.query() == null) indexes else { + val query = searchInput.query.query() + resolveOnlyQueryableIndicesFromLocalClusterAliases( + monitor, + periodEnd, + query, + indexes + ) + } + val searchRequest = SearchRequest() .indices(*resolvedIndexes.toTypedArray()) .preference(Preference.PRIMARY_FIRST.type())