diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt
index f69151b13..1eb826622 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt
@@ -120,12 +120,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
 
         try {
             // Resolve all passed indices to concrete indices
-            val concreteIndices = IndexUtils.resolveAllIndices(
+            val allConcreteIndices = IndexUtils.resolveAllIndices(
                 docLevelMonitorInput.indices,
                 monitorCtx.clusterService!!,
                 monitorCtx.indexNameExpressionResolver!!
             )
-            if (concreteIndices.isEmpty()) {
+            if (allConcreteIndices.isEmpty()) {
                 logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
                 throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
             }
@@ -141,7 +141,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
             // cleanup old indices that are not monitored anymore from the same monitor
             val runContextKeys = updatedLastRunContext.keys.toMutableSet()
             for (ind in runContextKeys) {
-                if (!concreteIndices.contains(ind)) {
+                if (!allConcreteIndices.contains(ind)) {
                     updatedLastRunContext.remove(ind)
                 }
             }
@@ -150,11 +150,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
             val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
 
             docLevelMonitorInput.indices.forEach { indexName ->
-                val concreteIndices = IndexUtils.resolveAllIndices(
+                var concreteIndices = IndexUtils.resolveAllIndices(
                     listOf(indexName),
                     monitorCtx.clusterService!!,
                     monitorCtx.indexNameExpressionResolver!!
                 )
+                var lastWriteIndex: String? = null
+                if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
+                    IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
+                ) {
+                    lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
+                    if (lastWriteIndex != null) {
+                        val lastWriteIndexCreationDate =
+                            IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
+                        concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
+                            concreteIndices,
+                            monitorCtx.clusterService!!.state(),
+                            lastWriteIndexCreationDate
+                        )
+                    }
+                }
                 val updatedIndexName = indexName.replace("*", "_")
                 val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
                     monitorCtx.clusterService!!.state(),
@@ -179,7 +194,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
                         monitorCtx,
                         concreteIndexName
                     ) as MutableMap<String, Any>
-                    updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
+                    if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
+                        IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
+                    ) {
+                        if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
+                            updatedLastRunContext.remove(lastWriteIndex)
+                            updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
+                        }
+                    } else {
+                        updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
+                    }
 
                     val count: Int = indexLastRunContext["shards_count"] as Int
                     for (i: Int in 0 until count) {
diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt
index e4c5a318c..c1a5c9aea 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt
@@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata
 import org.opensearch.alerting.opensearchapi.suspendUntil
 import org.opensearch.alerting.settings.AlertingSettings
 import org.opensearch.alerting.util.AlertingException
+import org.opensearch.alerting.util.IndexUtils
 import org.opensearch.client.Client
 import org.opensearch.cluster.service.ClusterService
 import org.opensearch.common.settings.Settings
@@ -216,11 +217,19 @@ object MonitorMetadataService :
         val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
         try {
             if (index == null) return mutableMapOf()
-            val getIndexRequest = GetIndexRequest().indices(index)
-            val getIndexResponse: GetIndexResponse = client.suspendUntil {
-                client.admin().indices().getIndex(getIndexRequest, it)
+
+            val indices = mutableListOf<String>()
+            if (IndexUtils.isAlias(index, clusterService.state()) ||
+                IndexUtils.isDataStream(index, clusterService.state())
+            ) {
+                IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
+            } else {
+                val getIndexRequest = GetIndexRequest().indices(index)
+                val getIndexResponse: GetIndexResponse = client.suspendUntil {
+                    client.admin().indices().getIndex(getIndexRequest, it)
+                }
+                indices.addAll(getIndexResponse.indices())
             }
-            val indices = getIndexResponse.indices()
 
             indices.forEach { indexName ->
                 if (!lastRunContext.containsKey(indexName)) {
diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt
index 6dc7c12ba..96e0d8f63 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt
@@ -207,11 +207,25 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
 
         // Run through each backing index and apply appropriate mappings to query index
         indices.forEach { indexName ->
-            val concreteIndices = IndexUtils.resolveAllIndices(
+            var concreteIndices = IndexUtils.resolveAllIndices(
                 listOf(indexName),
                 monitorCtx.clusterService!!,
                 monitorCtx.indexNameExpressionResolver!!
             )
+            if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
+                IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
+            ) {
+                val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) }
+                if (lastWriteIndex != null) {
+                    val lastWriteIndexCreationDate =
+                        IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
+                    concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
+                        concreteIndices,
+                        monitorCtx.clusterService!!.state(),
+                        lastWriteIndexCreationDate
+                    )
+                }
+            }
             val updatedIndexName = indexName.replace("*", "_")
             val updatedProperties = mutableMapOf<String, Any>()
             val allFlattenPaths = mutableSetOf<Pair<String, String>>()
diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt
index 4e34ed3c2..387f5cb22 100644
--- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt
+++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt
@@ -12,6 +12,7 @@ import org.opensearch.alerting.alerts.AlertIndices
 import org.opensearch.alerting.core.ScheduledJobIndices
 import org.opensearch.client.IndicesAdminClient
 import org.opensearch.cluster.ClusterState
+import org.opensearch.cluster.metadata.IndexAbstraction
 import org.opensearch.cluster.metadata.IndexMetadata
 import org.opensearch.cluster.metadata.IndexNameExpressionResolver
 import org.opensearch.cluster.service.ClusterService
@@ -153,5 +154,47 @@ class IndexUtils {
 
             return result
         }
+
+        @JvmStatic
+        fun isDataStream(name: String, clusterState: ClusterState): Boolean {
+            return clusterState.metadata().dataStreams().containsKey(name)
+        }
+
+        @JvmStatic
+        fun isAlias(name: String, clusterState: ClusterState): Boolean {
+            return clusterState.metadata().hasAlias(name)
+        }
+
+        @JvmStatic
+        fun getWriteIndex(index: String, clusterState: ClusterState): String? {
+            if (isAlias(index, clusterState) || isDataStream(index, clusterState)) {
+                val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex
+                if (metadata != null) {
+                    return metadata.index.name
+                }
+            }
+            return null
+        }
+
+        @JvmStatic
+        fun getNewestIndicesByCreationDate(concreteIndices: List<String>, clusterState: ClusterState, thresholdDate: Long): List<String> {
+            val filteredIndices = mutableListOf<String>()
+            val lookup = clusterState.metadata().indicesLookup
+            concreteIndices.forEach { indexName ->
+                val index = lookup[indexName]
+                val indexMetadata = clusterState.metadata.index(indexName)
+                if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) {
+                    if (indexMetadata.creationDate >= thresholdDate) {
+                        filteredIndices.add(indexName)
+                    }
+                }
+            }
+            return filteredIndices
+        }
+
+        @JvmStatic
+        fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long {
+            return clusterState.metadata.index(index).creationDate
+        }
     }
 }
diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt
index 9622efb31..40d7c37fb 100644
--- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt
+++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt
@@ -76,6 +76,8 @@ import javax.management.MBeanServerInvocationHandler
 import javax.management.ObjectName
 import javax.management.remote.JMXConnectorFactory
 import javax.management.remote.JMXServiceURL
+import kotlin.collections.ArrayList
+import kotlin.collections.HashMap
 
 /**
  * Superclass for tests that interact with an external test cluster using OpenSearch's RestClient
@@ -909,7 +911,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
     private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
         val requestBody = StringEntity(doc, APPLICATION_JSON)
         val params = if (refresh) mapOf("refresh" to "true") else mapOf()
-        val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
+        val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody)
         assertTrue(
             "Unable to index doc: '${doc.take(15)}...' to index: '$index'",
             listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
@@ -945,6 +947,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
         return index
     }
 
+    protected fun createTestIndex(index: String, mapping: String?, alias: String): String {
+        createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias)
+        return index
+    }
+
     protected fun createTestConfigIndex(index: String = "." + randomAlphaOfLength(10).lowercase(Locale.ROOT)): String {
         try {
             createIndex(
@@ -981,7 +988,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
         val indicesMap = mutableMapOf<String, Boolean>()
         val indicesJson = jsonBuilder().startObject().startArray("actions")
         indices.keys.map {
-            val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "")
+            val indexName = createTestIndex(index = it, mapping = "")
             val isWriteIndex = indices.getOrDefault(indexName, false)
             indicesMap[indexName] = isWriteIndex
             val indexMap = mapOf(
@@ -998,17 +1005,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
         return mutableMapOf(alias to indicesMap)
     }
 
+    protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) {
+        val indexPattern = "$datastream*"
+        var componentTemplateMappings = "\"properties\": {" +
+            "  \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
+            "  \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
+            "}"
+        if (mappings != null) {
+            componentTemplateMappings = mappings
+        }
+        if (useComponentTemplate) {
+            // Setup index_template
+            createComponentTemplateWithMappings(
+                "my_ds_component_template-$datastream",
+                componentTemplateMappings
+            )
+        }
+        createComposableIndexTemplate(
+            "my_index_template_ds-$datastream",
+            listOf(indexPattern),
+            (if (useComponentTemplate) "my_ds_component_template-$datastream" else null),
+            mappings,
+            true,
+            0
+        )
+        createDataStream(datastream)
+    }
+
+    protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) {
+        client().makeRequest("PUT", "_data_stream/$datastream")
+    }
+
+    protected fun deleteDataStream(datastream: String) {
+        client().makeRequest("DELETE", "_data_stream/$datastream")
+    }
+
+    protected fun createIndexAlias(alias: String, mappings: String?) {
+        val indexPattern = "$alias*"
+        var componentTemplateMappings = "\"properties\": {" +
+            "  \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
+            "  \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
+            "}"
+        if (mappings != null) {
+            componentTemplateMappings = mappings
+        }
+        createComponentTemplateWithMappings(
+            "my_alias_component_template-$alias",
+            componentTemplateMappings
+        )
+        createComposableIndexTemplate(
+            "my_index_template_alias-$alias",
+            listOf(indexPattern),
+            "my_alias_component_template-$alias",
+            mappings,
+            false,
+            0
+        )
+        createTestIndex(
+            "$alias-000001",
+            null,
+            """
+            "$alias": {
+              "is_write_index": true
+            }
+            """.trimIndent()
+        )
+    }
+
+    protected fun deleteIndexAlias(alias: String) {
+        client().makeRequest("DELETE", "$alias*/_alias/$alias")
+    }
+
+    protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) {
+        val body = """{"template" : {        "mappings": {$mappings}    }}"""
+        client().makeRequest(
+            "PUT",
+            "_component_template/$componentTemplateName",
+            emptyMap(),
+            StringEntity(body, ContentType.APPLICATION_JSON),
+            BasicHeader("Content-Type", "application/json")
+        )
+    }
+
+    protected fun createComposableIndexTemplate(
+        templateName: String,
+        indexPatterns: List<String>,
+        componentTemplateName: String?,
+        mappings: String?,
+        isDataStream: Boolean,
+        priority: Int
+    ) {
+        var body = "{\n"
+        if (isDataStream) {
+            body += "\"data_stream\": { },"
+        }
+        body += "\"index_patterns\": [" +
+            indexPatterns.stream().collect(
+                Collectors.joining(",", "\"", "\"")
+            ) + "],"
+        if (componentTemplateName == null) {
+            body += "\"template\": {\"mappings\": {$mappings}},"
+        }
+        if (componentTemplateName != null) {
+            body += "\"composed_of\": [\"$componentTemplateName\"],"
+        }
+        body += "\"priority\":$priority}"
+        client().makeRequest(
+            "PUT",
+            "_index_template/$templateName",
+            emptyMap(),
+            StringEntity(body, APPLICATION_JSON),
+            BasicHeader("Content-Type", "application/json")
+        )
+    }
+
+    protected fun getDatastreamWriteIndex(datastream: String): String {
+        val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null)
+        var respAsMap = responseAsMap(response)
+        if (respAsMap.containsKey("data_streams")) {
+            respAsMap = (respAsMap["data_streams"] as ArrayList<HashMap<String, *>>)[0]
+            val indices = respAsMap["indices"] as List<Map<String, Any>>
+            val index = indices.last()
+            return index["index_name"] as String
+        } else {
+            respAsMap = respAsMap[datastream] as Map<String, Object>
+        }
+        val indices = respAsMap["indices"] as Array<String>
+        return indices.last()
+    }
+
+    protected fun rolloverDatastream(datastream: String) {
+        client().makeRequest(
+            "POST",
+            datastream + "/_rollover",
+            emptyMap(),
+            null
+        )
+    }
+
     protected fun randomAliasIndices(
         alias: String,
         num: Int = randomIntBetween(1, 10),
         includeWriteIndex: Boolean = true,
     ): Map<String, Boolean> {
         val indices = mutableMapOf<String, Boolean>()
-        val writeIndex = randomIntBetween(0, num)
+        val writeIndex = randomIntBetween(0, num - 1)
         for (i: Int in 0 until num) {
-            var indexName = randomAlphaOfLength(10)
+            var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
             while (indexName.equals(alias) || indices.containsKey(indexName))
-                indexName = randomAlphaOfLength(10)
+                indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
             indices[indexName] = includeWriteIndex && i == writeIndex
         }
         return indices
diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt
index 4909d08ca..86eb3684d 100644
--- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt
+++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt
@@ -1223,6 +1223,314 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
         }
     }
 
+    fun `test document-level monitor when datastreams contain docs that do match query`() {
+        val dataStreamName = "test-datastream"
+        createDataStream(
+            dataStreamName,
+            """
+                "properties" : {
+                  "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
+                  "test_field" : { "type" : "keyword" },
+                  "number" : { "type" : "keyword" }
+                }
+            """.trimIndent(),
+            false
+        )
+
+        val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
+        val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery))
+
+        val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
+        val monitor = createMonitor(
+            randomDocumentLevelMonitor(
+                inputs = listOf(docLevelInput),
+                triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
+            )
+        )
+
+        val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
+        val testDoc = """{
+            "@timestamp": "$testTime",
+            "message" : "This is an error from IAD region",
+            "test_strict_date_time" : "$testTime",
+            "test_field" : "us-west-2"
+        }"""
+        indexDoc(dataStreamName, "1", testDoc)
+        var response = executeMonitor(monitor.id)
+        var output = entityAsMap(response)
+        var searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        var matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
+
+        rolloverDatastream(dataStreamName)
+        indexDoc(dataStreamName, "2", testDoc)
+        response = executeMonitor(monitor.id)
+        output = entityAsMap(response)
+        searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
+
+        deleteDataStream(dataStreamName)
+    }
+
+    fun `test document-level monitor when datastreams contain docs across read-only indices that do match query`() {
+        val dataStreamName = "test-datastream"
+        createDataStream(
+            dataStreamName,
+            """
+                "properties" : {
+                  "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
+                  "test_field" : { "type" : "keyword" },
+                  "number" : { "type" : "keyword" }
+                }
+            """.trimIndent(),
+            false
+        )
+
+        val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
+        val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery))
+
+        val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
+        val monitor = createMonitor(
+            randomDocumentLevelMonitor(
+                inputs = listOf(docLevelInput),
+                triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
+            )
+        )
+
+        val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
+        val testDoc = """{
+            "@timestamp": "$testTime",
+            "message" : "This is an error from IAD region",
+            "test_strict_date_time" : "$testTime",
+            "test_field" : "us-west-2"
+        }"""
+        indexDoc(dataStreamName, "1", testDoc)
+        var response = executeMonitor(monitor.id)
+        var output = entityAsMap(response)
+        var searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        var matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
+
+        indexDoc(dataStreamName, "2", testDoc)
+        rolloverDatastream(dataStreamName)
+        rolloverDatastream(dataStreamName)
+        indexDoc(dataStreamName, "4", testDoc)
+        rolloverDatastream(dataStreamName)
+        response = executeMonitor(monitor.id)
+        output = entityAsMap(response)
+        searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
+
+        indexDoc(dataStreamName, "5", testDoc)
+        indexDoc(dataStreamName, "6", testDoc)
+        response = executeMonitor(monitor.id)
+        output = entityAsMap(response)
+        searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
+        deleteDataStream(dataStreamName)
+    }
+
+    fun `test document-level monitor when index alias contain docs that do match query`() {
+        val aliasName = "test-alias"
+        createIndexAlias(
+            aliasName,
+            """
+                "properties" : {
+                  "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
+                  "test_field" : { "type" : "keyword" },
+                  "number" : { "type" : "keyword" }
+                }
+            """.trimIndent()
+        )
+
+        val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
+        val docLevelInput = DocLevelMonitorInput("description", listOf("$aliasName"), listOf(docQuery))
+
+        val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
+        val monitor = createMonitor(
+            randomDocumentLevelMonitor(
+                inputs = listOf(docLevelInput),
+                triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
+            )
+        )
+
+        val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
+        val testDoc = """{
+            "@timestamp": "$testTime",
+            "message" : "This is an error from IAD region",
+            "test_strict_date_time" : "$testTime",
+            "test_field" : "us-west-2"
+        }"""
+        indexDoc(aliasName, "1", testDoc)
+        var response = executeMonitor(monitor.id)
+        var output = entityAsMap(response)
+        var searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        var matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
+
+        rolloverDatastream(aliasName)
+        indexDoc(aliasName, "2", testDoc)
+        response = executeMonitor(monitor.id)
+        output = entityAsMap(response)
+        searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
+
+        deleteIndexAlias(aliasName)
+    }
+
+    fun `test document-level monitor  when multiple datastreams contain docs across read-only indices that do match query`() {
+        val dataStreamName1 = "test-datastream1"
+        createDataStream(
+            dataStreamName1,
+            """
+                "properties" : {
+                  "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
+                  "test_field" : { "type" : "keyword" },
+                  "number" : { "type" : "keyword" }
+                }
+            """.trimIndent(),
+            false
+        )
+        val dataStreamName2 = "test-datastream2"
+        createDataStream(
+            dataStreamName2,
+            """
+                "properties" : {
+                  "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
+                  "test_field" : { "type" : "keyword" },
+                  "number" : { "type" : "keyword" }
+                }
+            """.trimIndent(),
+            false
+        )
+
+        val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
+        val testDoc = """{
+            "@timestamp": "$testTime",
+            "message" : "This is an error from IAD region",
+            "test_strict_date_time" : "$testTime",
+            "test_field" : "us-west-2"
+        }"""
+        indexDoc(dataStreamName2, "-1", testDoc)
+        rolloverDatastream(dataStreamName2)
+        indexDoc(dataStreamName2, "0", testDoc)
+
+        val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
+        val docLevelInput = DocLevelMonitorInput("description", listOf("test-datastream*"), listOf(docQuery))
+
+        val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
+        val monitor = createMonitor(
+            randomDocumentLevelMonitor(
+                inputs = listOf(docLevelInput),
+                triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
+            )
+        )
+
+        indexDoc(dataStreamName1, "1", testDoc)
+        indexDoc(dataStreamName2, "1", testDoc)
+        var response = executeMonitor(monitor.id)
+        var output = entityAsMap(response)
+        var searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        var matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
+
+        indexDoc(dataStreamName1, "2", testDoc)
+        indexDoc(dataStreamName2, "2", testDoc)
+        rolloverDatastream(dataStreamName1)
+        rolloverDatastream(dataStreamName1)
+        rolloverDatastream(dataStreamName2)
+        indexDoc(dataStreamName1, "4", testDoc)
+        indexDoc(dataStreamName2, "4", testDoc)
+        rolloverDatastream(dataStreamName1)
+        response = executeMonitor(monitor.id)
+        output = entityAsMap(response)
+        searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 4, matchingDocsToQuery.size)
+
+        indexDoc(dataStreamName1, "5", testDoc)
+        indexDoc(dataStreamName1, "6", testDoc)
+        indexDoc(dataStreamName2, "5", testDoc)
+        indexDoc(dataStreamName2, "6", testDoc)
+        response = executeMonitor(monitor.id)
+        output = entityAsMap(response)
+        searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 4, matchingDocsToQuery.size)
+        deleteDataStream(dataStreamName1)
+        deleteDataStream(dataStreamName2)
+    }
+
+    fun `test document-level monitor ignoring old read-only indices for datastreams`() {
+        val dataStreamName = "test-datastream"
+        createDataStream(
+            dataStreamName,
+            """
+                "properties" : {
+                  "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
+                  "test_field" : { "type" : "keyword" },
+                  "number" : { "type" : "keyword" }
+                }
+            """.trimIndent(),
+            false
+        )
+
+        val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
+        val testDoc = """{
+            "@timestamp": "$testTime",
+            "message" : "This is an error from IAD region",
+            "test_strict_date_time" : "$testTime",
+            "test_field" : "us-west-2"
+        }"""
+        indexDoc(dataStreamName, "-1", testDoc)
+        rolloverDatastream(dataStreamName)
+        indexDoc(dataStreamName, "0", testDoc)
+
+        val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
+        val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery))
+
+        val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
+        val monitor = createMonitor(
+            randomDocumentLevelMonitor(
+                inputs = listOf(docLevelInput),
+                triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
+            )
+        )
+
+        indexDoc(dataStreamName, "1", testDoc)
+        var response = executeMonitor(monitor.id)
+        var output = entityAsMap(response)
+        var searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        var matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
+
+        rolloverDatastream(dataStreamName)
+        indexDoc(dataStreamName, "2", testDoc)
+        response = executeMonitor(monitor.id)
+        output = entityAsMap(response)
+        searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
+        @Suppress("UNCHECKED_CAST")
+        matchingDocsToQuery = searchResult[docQuery.id] as List<String>
+        assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)
+
+        deleteDataStream(dataStreamName)
+    }
+
     fun `test execute monitor with non-null data sources`() {
 
         val testIndex = createTestIndex()
@@ -1305,7 +1613,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
         deleteIndex(index1)
         deleteIndex(index2)
 
-        indexDoc(index4, "1", testDoc)
+        indexDoc(index4, "2", testDoc)
         response = executeMonitor(monitor.id)
 
         output = entityAsMap(response)
diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureDestinationRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureDestinationRestApiIT.kt
index d3650460b..bcf326499 100644
--- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureDestinationRestApiIT.kt
+++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureDestinationRestApiIT.kt
@@ -7,10 +7,10 @@ package org.opensearch.alerting.resthandler
 
 import org.apache.http.HttpHeaders
 import org.apache.http.message.BasicHeader
+import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix
 import org.junit.After
 import org.junit.Before
 import org.junit.BeforeClass
-import org.junit.Ignore
 import org.opensearch.alerting.ALERTING_GET_DESTINATION_ACCESS
 import org.opensearch.alerting.AlertingPlugin
 import org.opensearch.alerting.AlertingRestTestCase
@@ -30,7 +30,7 @@ import org.opensearch.test.junit.annotations.TestLogging
 import java.time.Instant
 
 // TODO investigate flaky nature of tests. not reproducible in local but fails in jenkins CI
-@Ignore
+@AwaitsFix(bugUrl = "")
 @TestLogging("level:DEBUG", reason = "Debug for tests.")
 @Suppress("UNCHECKED_CAST")
 class SecureDestinationRestApiIT : AlertingRestTestCase() {
diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureEmailAccountRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureEmailAccountRestApiIT.kt
index b8f13f296..0326e88cd 100644
--- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureEmailAccountRestApiIT.kt
+++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureEmailAccountRestApiIT.kt
@@ -9,10 +9,10 @@ import org.apache.http.HttpHeaders
 import org.apache.http.entity.ContentType
 import org.apache.http.entity.StringEntity
 import org.apache.http.message.BasicHeader
+import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix
 import org.junit.After
 import org.junit.Before
 import org.junit.BeforeClass
-import org.junit.Ignore
 import org.opensearch.alerting.ALERTING_GET_EMAIL_ACCOUNT_ACCESS
 import org.opensearch.alerting.ALERTING_NO_ACCESS_ROLE
 import org.opensearch.alerting.ALERTING_SEARCH_EMAIL_ACCOUNT_ACCESS
@@ -43,7 +43,7 @@ val SEARCH_EMAIL_ACCOUNT_DSL = """
 """.trimIndent()
 
 // TODO investigate flaky nature of tests. not reproducible in local but fails in jenkins CI
-@Ignore
+@AwaitsFix(bugUrl = "")
 class SecureEmailAccountRestApiIT : AlertingRestTestCase() {
 
     companion object {
diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureEmailGroupsRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureEmailGroupsRestApiIT.kt
index 0c1fc85f6..83845cc38 100644
--- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureEmailGroupsRestApiIT.kt
+++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureEmailGroupsRestApiIT.kt
@@ -9,10 +9,10 @@ import org.apache.http.HttpHeaders
 import org.apache.http.entity.ContentType
 import org.apache.http.entity.StringEntity
 import org.apache.http.message.BasicHeader
+import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix
 import org.junit.After
 import org.junit.Before
 import org.junit.BeforeClass
-import org.junit.Ignore
 import org.opensearch.alerting.ALERTING_GET_EMAIL_GROUP_ACCESS
 import org.opensearch.alerting.ALERTING_SEARCH_EMAIL_GROUP_ACCESS
 import org.opensearch.alerting.AlertingPlugin
@@ -42,7 +42,7 @@ val SEARCH_EMAIL_GROUP_DSL = """
 """.trimIndent()
 
 // TODO investigate flaky nature of tests. not reproducible in local but fails in jenkins CI
-@Ignore
+@AwaitsFix(bugUrl = "")
 @TestLogging("level:DEBUG", reason = "Debug for tests.")
 @Suppress("UNCHECKED_CAST")
 class SecureEmailGroupsRestApiIT : AlertingRestTestCase() {
diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt
index a52062d9d..6cc6b759e 100644
--- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt
+++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt
@@ -10,10 +10,10 @@ import org.apache.http.entity.ContentType
 import org.apache.http.entity.StringEntity
 import org.apache.http.message.BasicHeader
 import org.apache.http.nio.entity.NStringEntity
+import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix
 import org.junit.After
 import org.junit.Before
 import org.junit.BeforeClass
-import org.junit.Ignore
 import org.opensearch.alerting.ADMIN
 import org.opensearch.alerting.ALERTING_BASE_URI
 import org.opensearch.alerting.ALERTING_DELETE_MONITOR_ACCESS
@@ -68,7 +68,7 @@ import org.opensearch.search.builder.SearchSourceBuilder
 import org.opensearch.test.junit.annotations.TestLogging
 
 // TODO investigate flaky nature of tests. not reproducible in local but fails in jenkins CI
-@Ignore
+@AwaitsFix(bugUrl = "")
 @TestLogging("level:DEBUG", reason = "Debug for tests.")
 @Suppress("UNCHECKED_CAST")
 class SecureMonitorRestApiIT : AlertingRestTestCase() {