Skip to content

Commit

Permalink
Bulk index findings and sequentially invoke auto-correlations
Browse files Browse the repository at this point in the history
Signed-off-by: Megha Goyal <[email protected]>
  • Loading branch information
goyamegh committed Dec 27, 2023
1 parent 526433a commit e7ba6d7
Showing 1 changed file with 80 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -273,10 +275,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
}
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
Expand Down Expand Up @@ -365,7 +364,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
trigger: DocumentLevelTrigger,
monitor: Monitor,
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
Expand All @@ -374,35 +373,34 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)

val findings = mutableListOf<String>()
val findingDocPairs = mutableListOf<Pair<String, String>>()
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>()

// TODO: Implement throttling for findings
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(
monitor,
monitorCtx,
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)
findings.add(findingId)
val findingToDocPairs = createFindings(
monitor,
monitorCtx,
docsToQueries,
idQueryMap,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)

if (triggerResult.triggeredDocs.contains(it.key)) {
findingDocPairs.add(Pair(findingId, it.key))
findingToDocPairs.forEach {
// Only pick those entries whose docs have triggers associated with them
if (triggerResult.triggeredDocs.contains(it.second)) {
triggerFindingDocPairs.add(Pair(it.first, it.second))
}
}

val actionCtx = triggerCtx.copy(
triggeredDocs = triggerResult.triggeredDocs,
relatedFindings = findings,
// confirm if this is right or only trigger-able findings should be present in this list
relatedFindings = findingToDocPairs.map { it.first },
error = monitorResult.error ?: triggerResult.error
)

val alerts = mutableListOf<Alert>()
findingDocPairs.forEach {
triggerFindingDocPairs.forEach {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(it.first),
listOf(it.second),
Expand Down Expand Up @@ -461,51 +459,82 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return triggerResult
}

/**
* 1. Bulk index all findings based on shouldCreateFinding flag
* 2. invoke publishFinding() to kickstart auto-correlations
* 3. Returns a list of pairs for finding id to doc id
*/
private suspend fun createFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docLevelQueries: List<DocLevelQuery>,
matchingDocId: String,
docsToQueries: MutableMap<String, MutableList<String>>,
idQueryMap: Map<String, DocLevelQuery>,
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null,
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")
): List<Pair<String, String>> {

val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }

// Before the "|" is the doc id and after the "|" is the index
val docIndex = it.key.split("|")

if (shouldCreateFinding) {
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = triggeredQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
.string()
logger.debug("Findings: $findingStr")

if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
.opType(DocWriteRequest.OpType.INDEX)
}
}

monitorCtx.client!!.suspendUntil<Client, IndexResponse> {
monitorCtx.client!!.index(indexRequest, it)
if (indexRequests.isNotEmpty()) {
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}

try {
publishFinding(monitor, monitorCtx, finding)
findings.forEach { finding ->
publishFinding(monitor, monitorCtx, finding)
}
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return finding.id
return findingDocPairs
}

private fun publishFinding(
Expand Down

0 comments on commit e7ba6d7

Please sign in to comment.