Skip to content

Commit

Permalink
Add integ tests to test bulk index findings
Browse files Browse the repository at this point in the history
Signed-off-by: Megha Goyal <[email protected]>
  • Loading branch information
goyamegh committed Feb 6, 2024
1 parent ae32748 commit 1833d6d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -531,11 +530,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
indexRequests: List<IndexRequest>
) {
monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)

indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
bulk(BulkRequest().add(indexRequests), it)
bulk(BulkRequest().add(batch), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
Expand Down Expand Up @@ -170,6 +171,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon

monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings)

monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) {
monitorCtx.findingsIndexBatchSize = it
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,53 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
}

fun `test execute monitor for bulk index findings`() {
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
val testQueryName = "wildcard-test-query"
val testIndex = createTestIndex("${testIndexPrefix}1")
val testIndex2 = createTestIndex("${testIndexPrefix}2")

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]"))
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

for (i in 0 until 9) {
indexDoc(testIndex, i.toString(), testDoc)
}
indexDoc(testIndex2, "3", testDoc)
adminClient().updateSettings("plugins.alerting.alert_findings_indexing_batch_size", 2)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Correct search result", 10, matchingDocsToQuery.size)
assertTrue("Correct search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "2|$testIndex", "3|$testIndex2")))

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 10, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 10, findings.size)
val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") }
assertEquals("Found findings for all docs", 4, foundFindings.size)
}

fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS query operator`() {
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
val testQueryName = "wildcard-test-query"
Expand Down

0 comments on commit 1833d6d

Please sign in to comment.