Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize execution of workflow consisting of bucket-level followed by doc-level monitors #1729

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -479,7 +480,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
sr.source().query(queryBuilder).sort("_seq_no", SortOrder.DESC)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we sorting based on _seq_no?

there is already a range query based on period_end variable

this seems incorrect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to sort this because without sort, we get random 10 docs in period of last 15 minutes by default for a workflow running every 1 min. Now, the aggregation may have grouped 1000 docs.
So, 10 out of 1000 docs generated may not be the latest ones. We pass these 10 docs to the delegated doc-level monitor which has already moved its seq_no past these 10 random docs and hence do not generate an alert.

sort by seq_no ensures we always get the latest 10 docs out of the 1000 docs considered for aggregation. Thus, when the doc-level monitor runs next time, it gets latest 10 docs and it goes on to geenrate an alert.

}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex?.first
val findingIdsForMatchingDocIds = workflowRunContext?.matchingDocIdsPerIndex?.second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this map each individual doc id to finding id

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will still have one part <index to doc ids> and second part list of findingids.


val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
Expand Down Expand Up @@ -226,6 +227,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndices,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
findingIdsForMatchingDocIds
)

val shards = mutableSetOf<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class InputService(
periodStart = periodStart,
periodEnd = periodEnd,
prevResult = prevResult,
matchingDocIdsPerIndex = matchingDocIdsPerIndex,
matchingDocIdsPerIndex = matchingDocIdsPerIndex?.first,
returnSampleDocs = false
)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class WorkflowService(
* @param chainedMonitors Monitors that have previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> {
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String):
Pair<Map<String, List<String>>, List<String>> {
if (chainedMonitors.isEmpty())
return emptyMap()
return Pair(emptyMap(), listOf())
val dataSources = chainedMonitors[0].dataSources
try {
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it)
}
if (existsResponse.isExists == false) return emptyMap()
if (existsResponse.isExists == false) return Pair(emptyMap(), listOf())
// Search findings index to match id of monitors and workflow execution id
val bqb = QueryBuilders.boolQuery()
.filter(
Expand Down Expand Up @@ -83,7 +84,7 @@ class WorkflowService(
for (finding in findings) {
indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds)
}
return indexToRelatedDocIdsMap
return Pair(indexToRelatedDocIdsMap, findings.map { it.id })
} catch (t: Exception) {
log.error("Error getting finding doc ids: ${t.message}", t)
throw AlertingException.wrap(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,37 @@ class TransportDocLevelMonitorFanOutAction
createFindings(monitor, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
/**
* if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger
* generates a single alert with multiple findings.
*/
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
}
} else if (monitor.shouldPersistFindingsAndAlerts == true) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTriggerWithoutPersistFindingsAndAlerts(
monitorResult,
it as DocumentLevelTrigger,
monitor,
queryToDocIds,
dryrun,
executionId,
workflowRunContext
)
}
}
}

Expand Down Expand Up @@ -349,6 +367,58 @@ class TransportDocLevelMonitorFanOutAction
}
}

/**
* run doc-level triggers ignoring findings and alerts and generating a single alert.
*/
private suspend fun runForEachDocTriggerWithoutPersistFindingsAndAlerts(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
monitor: Monitor,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
executionId: String,
workflowRunContext: WorkflowRunContext?
): DocumentLevelTriggerRunResult {
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
if (triggerResult.triggeredDocs.isNotEmpty()) {
val findingIds = if (workflowRunContext?.matchingDocIdsPerIndex?.second != null) {
workflowRunContext.matchingDocIdsPerIndex.second
} else {
listOf()
}
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val alert = alertService.composeDocLevelAlert(
findingIds,
triggerResult.triggeredDocs,
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError(),
executionId = executionId,
workflorwRunContext = workflowRunContext
)
for (action in trigger.actions) {
this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun)
}

if (!dryrun && monitor.id != Monitor.NO_ID) {
val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap())
val actionExecutionResults = actionResults.values.map { actionRunResult ->
ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0)
}
val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults)

retryPolicy.let {
alertService.saveAlerts(
monitor.dataSources,
listOf(updatedAlert),
it,
routingId = monitor.id
)
}
}
}
return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error)
}

private suspend fun runForEachDocTrigger(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
Expand Down Expand Up @@ -512,7 +582,7 @@ class TransportDocLevelMonitorFanOutAction
.string()
log.debug("Findings: $findingStr")

if (shouldCreateFinding) {
if (shouldCreateFinding and (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false)) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand All @@ -524,13 +594,15 @@ class TransportDocLevelMonitorFanOutAction
bulkIndexFindings(monitor, indexRequests)
}

try {
findings.forEach { finding ->
publishFinding(monitor, finding)
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
try {
findings.forEach { finding ->
publishFinding(monitor, finding)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
this.findingsToTriggeredQueries += findingsToTriggeredQueries

Expand Down Expand Up @@ -688,6 +760,7 @@ class TransportDocLevelMonitorFanOutAction
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitor,
indexExecutionCtx.concreteIndexName,
shard,
from,
Expand Down Expand Up @@ -870,6 +943,7 @@ class TransportDocLevelMonitorFanOutAction
* This method hence fetches only docs from shard which haven't been queried before
*/
private suspend fun searchShard(
monitor: Monitor,
index: String,
shard: String,
prevSeqNo: Long?,
Expand All @@ -883,8 +957,16 @@ class TransportDocLevelMonitorFanOutAction
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
} else if (monitor.shouldPersistFindingsAndAlerts == true) {
val docIdsParam = mutableListOf<String>()
if (docIds != null) {
docIdsParam.addAll(docIds)
}
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam))
}

val request: SearchRequest = SearchRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
var lastErrorDelegateRun: Exception? = null

for (delegate in delegates) {
var indexToDocIds = mapOf<String, List<String>>()
var indexToDocIdsWithFindings: Pair<Map<String, List<String>>, List<String>>? = Pair(mapOf(), listOf())
var delegateMonitor: Monitor
delegateMonitor = monitorsById[delegate.monitorId]
?: throw AlertingException.wrap(
Expand All @@ -118,7 +118,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}

try {
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
indexToDocIdsWithFindings = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
} catch (e: Exception) {
logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e)
return WorkflowRunResult(
Expand All @@ -131,7 +131,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
workflowId = workflowMetadata.workflowId,
workflowMetadataId = workflowMetadata.id,
chainedMonitorId = delegate.chainedMonitorFindings?.monitorId,
matchingDocIdsPerIndex = indexToDocIds,
matchingDocIdsPerIndex = indexToDocIdsWithFindings!!,
auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true
else workflow.auditDelegateMonitorAlerts!!
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6233,4 +6233,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
)
)
}

fun `test execute workflow with custom alerts and finding index when bucket monitor is used in chained finding of ignored doc monitor`() {
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field_1").field("test_field_1")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
// Bucket level monitor will reduce the size of matched doc ids on those that belong
// to a bucket that contains more than 1 document after term grouping
val triggerScript = """
params.docCount > 1
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null,
)
)
val bucketCustomAlertsIndex = "custom_alerts_index"
val bucketCustomFindingsIndex = "custom_findings_index"
val bucketCustomFindingsIndexPattern = "custom_findings_index-1"

val bucketLevelMonitorResponse = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger),
dataSources = DataSources(
findingsEnabled = true,
alertsIndex = bucketCustomAlertsIndex,
findingsIndex = bucketCustomFindingsIndex,
findingsIndexPattern = bucketCustomFindingsIndexPattern
)
)
)!!

val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1", fields = listOf())
val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2", fields = listOf())
val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3))
val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val docCustomAlertsIndex = "custom_alerts_index"
val docCustomFindingsIndex = "custom_findings_index"
val docCustomFindingsIndexPattern = "custom_findings_index-1"
var docLevelMonitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(docTrigger),
dataSources = DataSources(
alertsIndex = docCustomAlertsIndex,
findingsIndex = docCustomFindingsIndex,
findingsIndexPattern = docCustomFindingsIndexPattern
),
ignoreFindingsAndAlerts = true
)

val docLevelMonitorResponse = createMonitor(docLevelMonitor)!!
// 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor)
var workflow = randomWorkflow(
monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id),
enabled = false,
auditDelegateMonitorAlerts = false
)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
assertNotNull(workflowById)

// Creates 5 documents
insertSampleTimeSerializedData(
index,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2",
"test_value_2",
"test_value_3"
)
)

val workflowId = workflowResponse.id
// 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4)
// 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth)
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
assertNotNull(executeWorkflowResponse)

for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) {
if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) {
val searchResult = monitorRunResults.inputResults.results.first()

@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")
?.get("buckets") as List<kotlin.collections.Map<String, Any>>
assertEquals("Incorrect search result", 3, buckets.size)

val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2, workflowId)
assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2)
assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4"))
} else {
assertEquals(1, monitorRunResults.inputResults.results.size)
val values = monitorRunResults.triggerResults.values
assertEquals(1, values.size)

val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 1, workflowId)
assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 1)
assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 0, 0, listOf("1", "2", "3", "4"))
}
}
}
}
Loading
Loading