diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
index a40a9c9c2..b8962b29d 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
@@ -304,7 +304,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
- .registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings))
+ .registerInputService(
+ InputService(
+ client,
+ scriptService,
+ namedWriteableRegistry,
+ xContentRegistry,
+ clusterService,
+ settings,
+ indexNameExpressionResolver
+ )
+ )
.registerTriggerService(triggerService)
.registerAlertService(alertService)
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
index e7b107e2b..769d867c8 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
@@ -15,11 +15,13 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.CrossClusterMonitorUtils
+import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.client.Client
+import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.io.stream.BytesStreamOutput
@@ -39,12 +41,14 @@ import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilder
import org.opensearch.index.query.QueryBuilders
+import org.opensearch.index.query.RangeQueryBuilder
import org.opensearch.index.query.TermsQueryBuilder
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
+import java.time.Duration
import java.time.Instant
/** Service that handles the collection of input results for Monitor executions */
@@ -54,7 +58,8 @@ class InputService(
val namedWriteableRegistry: NamedWriteableRegistry,
val xContentRegistry: NamedXContentRegistry,
val clusterService: ClusterService,
- val settings: Settings
+ val settings: Settings,
+ val indexNameExpressionResolver: IndexNameExpressionResolver
) {
private val logger = LogManager.getLogger(InputService::class.java)
@@ -244,8 +249,19 @@ class InputService(
.execute()
val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(searchInput.indices, clusterService)
+
+ val resolvedIndexes = if (searchInput.query.query() == null) indexes else {
+ val query = searchInput.query.query()
+ resolveOnlyQueryableIndicesFromLocalClusterAliases(
+ monitor,
+ periodEnd,
+ query,
+ indexes
+ )
+ }
+
val searchRequest = SearchRequest()
- .indices(*indexes.toTypedArray())
+ .indices(*resolvedIndexes.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
@@ -255,6 +271,72 @@ class InputService(
return searchRequest
}
+ /**
+ * Resolves concrete indices from aliases based on a time range query and availability in the local cluster.
+ *
+ *
If an index passed to OpenSearch is an alias, this method will only select those indices
+ * resolved from the alias that meet the following criteria:
+ *
+ *
+ * - The index's creation date falls within the time range specified in the query's timestamp field.
+ * - The index immediately preceding the time range in terms of creation date is also included.
+ *
+ *
+ * This ensures that queries targeting aliases consider relevant indices based on their creation time,
+ * including the one immediately before the specified range to account for potential data at the boundary.
+ */
+ private fun resolveOnlyQueryableIndicesFromLocalClusterAliases(
+ monitor: Monitor,
+ periodEnd: Instant,
+ query: QueryBuilder,
+ indexes: List,
+ ): List {
+ val resolvedIndexes = ArrayList()
+ indexes.forEach {
+ // we don't optimize for remote cluster aliases. we directly pass them to search request
+ if (CrossClusterMonitorUtils.isRemoteClusterIndex(it, clusterService))
+ resolvedIndexes.add(it)
+ else {
+ val state = clusterService.state()
+ if (IndexUtils.isAlias(it, state)) {
+ val resolveStartTimeOfQueryTimeRange = resolveStartTimeofQueryTimeRange(monitor, query, periodEnd)
+ if (resolveStartTimeOfQueryTimeRange != null) {
+ val indices = IndexUtils.resolveAllIndices(listOf(it), clusterService, indexNameExpressionResolver)
+ val sortedIndices = indices
+ .mapNotNull { state.metadata().index(it) } // Get IndexMetadata for each index
+ .sortedBy { it.creationDate } // Sort by creation date
+
+ var includePrevious = true
+ for (i in sortedIndices.indices) {
+ val indexMetadata = sortedIndices[i]
+ val creationDate = indexMetadata.creationDate
+
+ if (creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()) {
+ resolvedIndexes.add(indexMetadata.index.name)
+ includePrevious = false // No need to include previous anymore
+ } else if (
+ includePrevious && (
+ i == sortedIndices.lastIndex ||
+ sortedIndices[i + 1].creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli()
+ )
+ ) {
+ // Include the index immediately before the timestamp
+ resolvedIndexes.add(indexMetadata.index.name)
+ includePrevious = false
+ }
+ }
+ } else {
+ // add alias without optimizing for resolve indices
+ resolvedIndexes.add(it)
+ }
+ } else {
+ resolvedIndexes.add(it)
+ }
+ }
+ }
+ return resolvedIndexes
+ }
+
private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList