diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index f7383aaab..b3ddc9284 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -294,7 +294,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerSettings(settings) .registerThreadPool(threadPool) .registerAlertIndices(alertIndices) - .registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings)) + .registerInputService( + InputService( + client, + scriptService, + namedWriteableRegistry, + xContentRegistry, + clusterService, + settings, + indexNameExpressionResolver + ) + ) .registerTriggerService(triggerService) .registerAlertService(alertService) .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index c3cf90251..091549f85 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -15,12 +15,14 @@ import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.CrossClusterMonitorUtils +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.util.use import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput @@ -40,12 +42,14 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.query.RangeQueryBuilder import org.opensearch.index.query.TermsQueryBuilder import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.ScriptType import org.opensearch.script.TemplateScript import org.opensearch.search.builder.SearchSourceBuilder +import java.time.Duration import java.time.Instant /** Service that handles the collection of input results for Monitor executions */ @@ -55,7 +59,8 @@ class InputService( val namedWriteableRegistry: NamedWriteableRegistry, val xContentRegistry: NamedXContentRegistry, val clusterService: ClusterService, - val settings: Settings + val settings: Settings, + val indexNameExpressionResolver: IndexNameExpressionResolver ) { private val logger = LogManager.getLogger(InputService::class.java) @@ -245,8 +250,9 @@ class InputService( .execute() val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService) + val resolvedIndexes = resolveOnlyQueryableIndicesFromLocalClusterAliases(monitor, periodEnd, searchInput.query.query(), indexes) val searchRequest = SearchRequest() - .indices(*indexes.toTypedArray()) + .indices(*resolvedIndexes.toTypedArray()) .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { @@ -256,6 +262,69 @@ class InputService( return searchRequest } + /** + * Resolves concrete indices from aliases based on a time range query and availability in the local cluster. + * + *

If an index passed to OpenSearch is an alias, this method will only select those indices + * resolved from the alias that meet the following criteria: + * + *

    + *
  1. The index's creation date falls within the time range specified in the query's timestamp field.
  2. + *
  3. The index immediately preceding the time range in terms of creation date is also included.
  4. + *
+ * + *

This ensures that queries targeting aliases consider relevant indices based on their creation time, + * including the one immediately before the specified range to account for potential data at the boundary. + */ + private fun resolveOnlyQueryableIndicesFromLocalClusterAliases( + monitor: Monitor, + periodEnd: Instant, + query: QueryBuilder, + indexes: List, + ): List { + val resolvedIndexes = ArrayList() + indexes.forEach { + // we don't optimize for remote cluster aliases. we directly pass them to search request + if (CrossClusterMonitorUtils.isRemoteClusterIndex(it, clusterService)) + resolvedIndexes.add(it) + else { + val state = clusterService.state() + if (IndexUtils.isAlias(it, state)) { + val resolveStartTimeOfQueryTimeRange = resolveStartTimeofQueryTimeRange(monitor, query, periodEnd) + if (resolveStartTimeOfQueryTimeRange != null) { + val indices = IndexUtils.resolveAllIndices(listOf(it), clusterService, indexNameExpressionResolver) + val sortedIndices = indices + .mapNotNull { state.metadata().index(it) } // Get IndexMetadata for each index + .sortedBy { it.creationDate } // Sort by creation date + + var includePrevious = true + for (i in sortedIndices.indices) { + val indexMetadata = sortedIndices[i] + val creationDate = indexMetadata.creationDate + + if (creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()) { + resolvedIndexes.add(indexMetadata.index.name) + includePrevious = false // No need to include previous anymore + } else if ( + includePrevious && i > 0 && sortedIndices[i - 1].creationDate < + resolveStartTimeOfQueryTimeRange.toEpochMilli() + ) { + // Include the index immediately before the timestamp + resolvedIndexes.add(indexMetadata.index.name) + includePrevious = false + } + } + } else { + resolvedIndexes.add(it) + } + } else { + resolvedIndexes.add(it) + } + } + } + return resolvedIndexes + } + private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList> { logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) @@ -289,4 +358,52 @@ class InputService( } return results } + + fun resolveStartTimeofQueryTimeRange(monitor: Monitor, query: QueryBuilder, periodEnd: Instant): Instant? { + try { + val rangeQuery = findRangeQuery(query) ?: return null + val searchParameter = rangeQuery.from().toString() // we are looking for 'timeframe' variable {{period_end}}||- + + val timeframeString = searchParameter.substringAfter("||-") + val timeframeRegex = Regex("(\\d+)([a-zA-Z]+)") + val matchResult = timeframeRegex.find(timeframeString) + val (amount, unit) = matchResult?.destructured?.let { (a, u) -> a to u } + ?: throw IllegalArgumentException("Invalid timeframe format: $timeframeString") + val duration = when (unit) { + "m" -> Duration.ofMinutes(amount.toLong()) + "h" -> Duration.ofHours(amount.toLong()) + "d" -> Duration.ofDays(amount.toLong()) + else -> throw IllegalArgumentException("Invalid time unit: $unit") + } + + return periodEnd.minus(duration) + } catch (e: Exception) { + logger.error( + "Monitor ${monitor.id}:" + + " Failed to resolve time frame of search query while optimizing to query only on few of alias' concrete indices", + e + ) + return null // won't do optimization as we failed to resolve the timeframe due to unexpected error + } + } + + private fun findRangeQuery(queryBuilder: QueryBuilder?): RangeQueryBuilder? { + if (queryBuilder == null) return null + if (queryBuilder is RangeQueryBuilder) return queryBuilder + + if (queryBuilder is BoolQueryBuilder) { + for (clause in queryBuilder.must()) { + val rangeQuery = findRangeQuery(clause) + if (rangeQuery != null) return rangeQuery + } + for (clause in queryBuilder.should()) { + val rangeQuery = findRangeQuery(clause) + if (rangeQuery != null) return rangeQuery + } + // You can also check queryBuilder.filter() and queryBuilder.mustNot() if needed + } + + // Add handling for other query types if necessary (e.g., NestedQueryBuilder, etc.) + return null + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt index 6ec14ffa2..b9ddaf187 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt @@ -227,5 +227,10 @@ class CrossClusterMonitorUtils { return if (clusterName.isNotEmpty()) "$clusterName:$indexName" else indexName } + + fun isRemoteClusterIndex(index: String, clusterService: ClusterService): Boolean { + val clusterName = parseClusterName(index) + return clusterName.isNotEmpty() && clusterService.clusterName.value() != clusterName + } } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index d43a154fc..486ea344e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1032,6 +1032,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return createTestAlias(alias = alias, indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex)) } + protected fun createTestAlias( + alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT), + numOfAliasIndices: Int = randomIntBetween(1, 10), + includeWriteIndex: Boolean = true, + indicesMapping: String, + ): MutableMap> { + return createTestAlias( + alias = alias, + indices = randomAliasIndices(alias, numOfAliasIndices, includeWriteIndex), + indicesMapping = indicesMapping + ) + } + protected fun createTestAlias( alias: String = randomAlphaOfLength(10).lowercase(Locale.ROOT), indices: Map = randomAliasIndices( @@ -1039,12 +1052,13 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { num = randomIntBetween(1, 10), includeWriteIndex = true ), - createIndices: Boolean = true + createIndices: Boolean = true, + indicesMapping: String = "" ): MutableMap> { val indicesMap = mutableMapOf() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { - if (createIndices) createTestIndex(index = it, mapping = "") + if (createIndices) createTestIndex(index = it, indicesMapping) val isWriteIndex = indices.getOrDefault(it, false) indicesMap[it] = isWriteIndex val indexMap = mapOf( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index c59bac0eb..4140931d7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -1190,6 +1190,60 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } + fun `test execute bucket-level monitor with alias`() { + val indexMapping = """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + val alias = createTestAlias(randomAlphaOfLength(10), 10, true, indexMapping) + val aliasName = alias.keys.first() + insertSampleTimeSerializedData( + aliasName, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2" + ) + ) + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(aliasName), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ) + ) + val monitor = createMonitor(randomBucketLevelMonitor(inputs = listOf(input), enabled = false, triggers = listOf(trigger))) + val response = executeMonitor(monitor.id, params = DRYRUN_MONITOR) + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> + assertEquals("Incorrect search result", 2, buckets.size) + } + fun `test execute bucket-level monitor returns search result with multi term agg`() { val index = "test_index_1234" indexDoc(