Skip to content

Commit

Permalink
clean up doc level queries on dry run (opensearch-project#1430)
Browse files Browse the repository at this point in the history
Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon authored and eirsep committed Mar 13, 2024
1 parent 9d1f91e commit 83a55f3
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
} else {
// Clean up any queries created by the dry run monitor
monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.rollover.RolloverRequest
import org.opensearch.action.admin.indices.rollover.RolloverResponse
Expand All @@ -39,7 +42,14 @@ import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.BulkByScrollResponse
import org.opensearch.index.reindex.DeleteByQueryAction
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.rest.RestStatus
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java)

Expand Down Expand Up @@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
return true
}

suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) {
try {
monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) ->
val indicesExistsResponse: IndicesExistsResponse =
client.suspendUntil {
client.admin().indices().exists(IndicesExistsRequest(queryIndex), it)
}
if (indicesExistsResponse.isExists == false) {
return
}

val queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.existsQuery("monitor_id"))
.mustNot(QueryBuilders.wildcardQuery("monitor_id", "*"))

val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(queryIndex)
.filter(queryBuilder)
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
)
}
response.bulkFailures.forEach {
log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ")
}
}
} catch (e: Exception) {
log.error("Failed to delete doc level queries on dry run", e)
}
}

fun docLevelQueryIndexExists(dataSources: DataSources): Boolean {
val clusterState = clusterService.state()
return clusterState.metadata.hasAlias(dataSources.queryIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,168 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

val index = createTestIndex()

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(1, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
}

fun `test dryrun execute monitor with queryFieldNames set up with correct field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()

val docQuery =
DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf(), queryFieldNames = listOf("test_field"))
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(1, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()
// using wrong field name
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("wrong_field")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(0, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test fetch_query_field_names setting is disabled by configuring queryFieldNames set up with wrong field still works`() {
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.key, "false")
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()
// using wrong field name
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("wrong_field")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
Expand Down Expand Up @@ -105,6 +266,120 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex"))

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) }
}

fun `test execute monitor returns search result with dryrun then without dryrun ensure dry run query not saved`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex"))
assertTrue("Incorrect search result", matchingDocsToQuery.contains("2|$testIndex"))

// ensure doc level query is deleted on dry run
val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals(0L, it.value) }

// create and execute second monitor not as dryrun
val testIndex2 = createTestIndex("test1")
val testTime2 = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc2 = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime2",
"test_field" : "us-east-1"
}"""

val docQuery2 = DocLevelQuery(query = "test_field:\"us-east-1\"", name = "3", fields = listOf())
val docLevelInput2 = DocLevelMonitorInput("description", listOf(testIndex2), listOf(docQuery2))

val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor2 = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput2), triggers = listOf(trigger2)))
assertNotNull(monitor2.id)

indexDoc(testIndex2, "1", testDoc2)
indexDoc(testIndex2, "5", testDoc2)

val response2 = executeMonitor(monitor2.id)
val output2 = entityAsMap(response2)

assertEquals(monitor2.name, output2["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult2 = (output2.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery2 = searchResult2[docQuery2.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery2.size)
assertTrue("Incorrect search result", matchingDocsToQuery2.containsAll(listOf("1|$testIndex2", "5|$testIndex2")))

val alerts = searchAlertsWithFilter(monitor2)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor2)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))

// ensure query from second monitor was saved
val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"")
httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.forEach { hit ->
val query = ((hit.sourceAsMap["query"] as Map<String, Any>)["query_string"] as Map<String, Any>)["query"]
assertTrue(expectedQueries.contains(query))
}
searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 1L, it.value) }
}

fun `test execute monitor generates alerts and findings`() {
Expand Down

0 comments on commit 83a55f3

Please sign in to comment.