diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index f19075979..d195e4a8d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -303,7 +303,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerSettings(settings) .registerThreadPool(threadPool) .registerAlertIndices(alertIndices) - .registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings)) + .registerInputService( + InputService( + client, + scriptService, + namedWriteableRegistry, + xContentRegistry, + clusterService, + settings, + indexNameExpressionResolver + ) + ) .registerTriggerService(triggerService) .registerAlertService(alertService) .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index e7b107e2b..769d867c8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -15,11 +15,13 @@ import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.CrossClusterMonitorUtils +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput @@ -39,12 +41,14 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.query.RangeQueryBuilder import org.opensearch.index.query.TermsQueryBuilder import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.ScriptType import org.opensearch.script.TemplateScript import org.opensearch.search.builder.SearchSourceBuilder +import java.time.Duration import java.time.Instant /** Service that handles the collection of input results for Monitor executions */ @@ -54,7 +58,8 @@ class InputService( val namedWriteableRegistry: NamedWriteableRegistry, val xContentRegistry: NamedXContentRegistry, val clusterService: ClusterService, - val settings: Settings + val settings: Settings, + val indexNameExpressionResolver: IndexNameExpressionResolver ) { private val logger = LogManager.getLogger(InputService::class.java) @@ -244,8 +249,19 @@ class InputService( .execute() val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService) + + val resolvedIndexes = if (searchInput.query.query() == null) indexes else { + val query = searchInput.query.query() + resolveOnlyQueryableIndicesFromLocalClusterAliases( + monitor, + periodEnd, + query, + indexes + ) + } + val searchRequest = SearchRequest() - .indices(*indexes.toTypedArray()) + .indices(*resolvedIndexes.toTypedArray()) .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { @@ -255,6 +271,72 @@ class InputService( return searchRequest } + /** + * Resolves concrete indices from aliases based on a time range query and availability in the local cluster. + * + * <p>If an index passed to OpenSearch is an alias, this method will only select those indices + * resolved from the alias that meet the following criteria: + * + * <ol> + * <li>The index's creation date falls within the time range specified in the query's timestamp field.</li> + * <li>The index immediately preceding the time range in terms of creation date is also included.</li> + * </ol> + * + * <p>This ensures that queries targeting aliases consider relevant indices based on their creation time, + * including the one immediately before the specified range to account for potential data at the boundary. + */ + private fun resolveOnlyQueryableIndicesFromLocalClusterAliases( + monitor: Monitor, + periodEnd: Instant, + query: QueryBuilder, + indexes: List<String>, + ): List<String> { + val resolvedIndexes = ArrayList<String>() + indexes.forEach { + // we don't optimize for remote cluster aliases. we directly pass them to search request + if (CrossClusterMonitorUtils.isRemoteClusterIndex(it, clusterService)) + resolvedIndexes.add(it) + else { + val state = clusterService.state() + if (IndexUtils.isAlias(it, state)) { + val resolveStartTimeOfQueryTimeRange = resolveStartTimeofQueryTimeRange(monitor, query, periodEnd) + if (resolveStartTimeOfQueryTimeRange != null) { + val indices = IndexUtils.resolveAllIndices(listOf(it), clusterService, indexNameExpressionResolver) + val sortedIndices = indices + .mapNotNull { state.metadata().index(it) } // Get IndexMetadata for each index + .sortedBy { it.creationDate } // Sort by creation date + + var includePrevious = true + for (i in sortedIndices.indices) { + val indexMetadata = sortedIndices[i] + val creationDate = indexMetadata.creationDate + + if (creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()) { + resolvedIndexes.add(indexMetadata.index.name) + includePrevious = false // No need to include previous anymore + } else if ( + includePrevious && ( + i == sortedIndices.lastIndex || + sortedIndices[i + 1].creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli() + ) + ) { + // Include the index immediately before the timestamp + resolvedIndexes.add(indexMetadata.index.name) + includePrevious = false + } + } + } else { + // add alias without optimizing for resolve indices + resolvedIndexes.add(it) + } + } else { + resolvedIndexes.add(it) + } + } + } + return resolvedIndexes + } + private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList<Map<String, Any>> { logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) @@ -288,4 +370,53 @@ class InputService( } return results } + + fun resolveStartTimeofQueryTimeRange(monitor: Monitor, query: QueryBuilder, periodEnd: Instant): Instant? { + try { + val rangeQuery = findRangeQuery(query) ?: return null + val searchParameter = rangeQuery.from().toString() // we are looking for 'timeframe' variable {{period_end}}||-<timeframe> + + val timeframeString = searchParameter.substringAfter("||-") + val timeframeRegex = Regex("(\\d+)([a-zA-Z]+)") + val matchResult = timeframeRegex.find(timeframeString) + val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u } + ?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString") + val duration = when (unit) { + "s" -> Duration.ofSeconds(amount.toLong()) + "m" -> Duration.ofMinutes(amount.toLong()) + "h" -> Duration.ofHours(amount.toLong()) + "d" -> Duration.ofDays(amount.toLong()) + else -> throw IllegalArgumentException("Invalid time unit: $unit") + } + + return periodEnd.minus(duration) + } catch (e: Exception) { + logger.error( + "Monitor ${monitor.id}:" + + " Failed to resolve time frame of search query while optimizing to query only on few of alias' concrete indices", + e + ) + return null // won't do optimization as we failed to resolve the timeframe due to unexpected error + } + } + + private fun findRangeQuery(queryBuilder: QueryBuilder?): RangeQueryBuilder? { + if (queryBuilder == null) return null + if (queryBuilder is RangeQueryBuilder) return queryBuilder + + if (queryBuilder is BoolQueryBuilder) { + for (clause in queryBuilder.must()) { + val rangeQuery = findRangeQuery(clause) + if (rangeQuery != null) return rangeQuery + } + for (clause in queryBuilder.should()) { + val rangeQuery = findRangeQuery(clause) + if (rangeQuery != null) return rangeQuery + } + // You can also check queryBuilder.filter() and queryBuilder.mustNot() if needed + } + + // Add handling for other query types if necessary (e.g., NestedQueryBuilder, etc.) + return null + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt index 6ec14ffa2..b9ddaf187 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt @@ -227,5 +227,10 @@ class CrossClusterMonitorUtils { return if (clusterName.isNotEmpty()) "$clusterName:$indexName" else indexName } + + fun isRemoteClusterIndex(index: String, clusterService: ClusterService): Boolean { + val clusterName = parseClusterName(index) + return clusterName.isNotEmpty() && clusterService.clusterName.value() != clusterName + } } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index a0a9fe7df..a895e2ccf 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1052,6 +1052,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex)) } + protected fun createTestAlias( + alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT), + numOfAliasIndices: Int = randomIntBetween(1, 10), + includeWriteIndex: Boolean = true, + indicesMapping: String, + ): MutableMap<String, MutableMap<String, Boolean>> { + return createTestAlias( + alias = alias, + indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex), + indicesMapping = indicesMapping + ) + } + protected fun createTestAlias( alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT), indices: Map<String, Boolean> = randomAliasIndices( @@ -1059,12 +1072,13 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { num = randomIntBetween(1, 10), includeWriteIndex = true ), - createIndices: Boolean = true + createIndices: Boolean = true, + indicesMapping: String = "" ): MutableMap<String, MutableMap<String, Boolean>> { val indicesMap = mutableMapOf<String, Boolean>() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { - if (createIndices) createTestIndex(index = it, mapping = "") + if (createIndices) createTestIndex(index = it, indicesMapping) val isWriteIndex = indices.getOrDefault(it, false) indicesMap[it] = isWriteIndex val indexMap = mapOf( @@ -1263,6 +1277,41 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } } + protected fun insertSampleTimeSerializedDataCurrentTime(index: String, data: List<String>) { + data.forEachIndexed { i, value -> + val time = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field": "$value", + "number": "$i" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + + protected fun insertSampleTimeSerializedDataWithTime( + index: String, + data: List<String>, + time: ZonedDateTime? = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS), + ) { + data.forEachIndexed { i, value -> + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field": "$value", + "number": "$i" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + protected fun deleteDataWithDocIds(index: String, docIds: List<String>) { docIds.forEach { deleteDoc(index, it) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index f71f51c67..390fc80f3 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.model.destination.email.Email import org.opensearch.alerting.model.destination.email.Recipient import org.opensearch.alerting.util.DestinationType import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.client.Request import org.opensearch.client.ResponseException import org.opensearch.client.WarningFailureException import org.opensearch.common.settings.Settings @@ -1185,6 +1186,157 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } + fun `test execute bucket-level monitor with alias optimization - indices not skipped from query`() { + val skipIndex = createTestIndex("to_skip_index") + val previousIndex = createTestIndex("to_include_index") + + val indexMapping = """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping) + val aliasName = alias.keys.first() + insertSampleTimeSerializedDataCurrentTime( + aliasName, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + insertSampleTimeSerializedDataWithTime( + previousIndex, + listOf( + "test_value_3", + "test_value_4", + "test_value_5" + ) + ) + insertSampleTimeSerializedDataWithTime( + skipIndex, + listOf( + "test_value_6", + "test_value_7", + "test_value_8" + ) + ) + addIndexToAlias(previousIndex, aliasName) + addIndexToAlias(skipIndex, aliasName) + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10s") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(aliasName), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List<Map<String, Any>> + Assert.assertEquals(buckets.size, 8) + } + + fun `test execute bucket-level monitor with alias optimization - indices skipped from query`() { + val skipIndex = createTestIndex("to_skip_index") + Thread.sleep(10000) + val previousIndex = createTestIndex("to_include_index") + insertSampleTimeSerializedDataWithTime( + previousIndex, + listOf( + "test_value_3", + "test_value_4", + "test_value_5" + ), + ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10) + ) + insertSampleTimeSerializedDataWithTime( + skipIndex, + listOf( + "test_value_6", + "test_value_7", + "test_value_8" + ), + ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10) + ) + Thread.sleep(10000) + val indexMapping = """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping) + val aliasName = alias.keys.first() + insertSampleTimeSerializedDataCurrentTime( + aliasName, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + addIndexToAlias(previousIndex, aliasName) + addIndexToAlias(skipIndex, aliasName) + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10s") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(aliasName), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List<Map<String, Any>> + Assert.assertTrue(buckets.size <= 5) + } + fun `test execute bucket-level monitor returns search result with multi term agg`() { val index = "test_index_1234" indexDoc( @@ -2175,4 +2327,21 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> { return this[key] as Map<String, Map<String, Any>> } + + fun addIndexToAlias(index: String, alias: String) { + val request = Request("POST", "/_aliases") + request.setJsonEntity( + """{"actions": [{"add": {"index": "$index","alias": "$alias"}} ]}""".trimIndent() + ) + + try { + val response = client().performRequest(request) + if (response.statusLine.statusCode != RestStatus.OK.status) { + throw ResponseException(response) + } + } catch (e: Exception) { + // Handle the exception appropriately, e.g., log it or rethrow + throw RuntimeException("Failed to add index to alias", e) + } + } }