diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 87b2fb985..979a156cd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -8,8 +8,10 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -86,7 +88,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) - + var nonPercolateSearchesTimeTaken = AtomicLong(0) + var percolateQueriesTimeTaken = AtomicLong(0) + var totalDocsQueried = AtomicLong(0) try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) @@ -208,7 +212,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val indexUpdatedRunContext = updateLastRunContext( indexLastRunContext.toMutableMap(), monitorCtx, - concreteIndexName + concreteIndexName, + nonPercolateSearchesTimeTaken ) as MutableMap if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) @@ -265,7 +270,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsSizeInBytes, updatedIndexNames, concreteIndicesSeenSoFar, - ArrayList(fieldsToBeQueried) + ArrayList(fieldsToBeQueried), + nonPercolateSearchesTimeTaken, + percolateQueriesTimeTaken, + totalDocsQueried ) } } @@ -281,7 +289,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { updatedIndexNames, concreteIndicesSeenSoFar, inputRunResults, - docsToQueries + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried ) } val took = System.currentTimeMillis() - queryingStartTimeMillis @@ -305,11 +315,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { - logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor $monitor.id") - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) - } + logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) } } else { monitor.triggers.forEach { @@ -363,6 +370,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + } finally { + logger.error( + "PERF_DEBUG_STATS: Monitor ${monitor.id} " + + "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken" + ) + logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on percolate queries in millis: $percolateQueriesTimeTaken") + logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Num docs queried: $totalDocsQueried") } } @@ -401,7 +415,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { trigger: DocumentLevelTrigger, monitor: Monitor, idQueryMap: Map, - docsToQueries: Map>, + docsToQueries: MutableMap>, queryToDocIds: Map>, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, @@ -410,35 +424,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) - val findings = mutableListOf() - val findingDocPairs = mutableListOf>() + val triggerFindingDocPairs = mutableListOf>() // TODO: Implement throttling for findings - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings( - monitor, - monitorCtx, - triggeredQueries, - it.key, - !dryrun && monitor.id != Monitor.NO_ID, - executionId - ) - findings.add(findingId) + val findingToDocPairs = createFindings( + monitor, + monitorCtx, + docsToQueries, + idQueryMap, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) - if (triggerResult.triggeredDocs.contains(it.key)) { - findingDocPairs.add(Pair(findingId, it.key)) + findingToDocPairs.forEach { + // Only pick those entries whose docs have triggers associated with them + if (triggerResult.triggeredDocs.contains(it.second)) { + triggerFindingDocPairs.add(Pair(it.first, it.second)) } } val actionCtx = triggerCtx.copy( triggeredDocs = triggerResult.triggeredDocs, - relatedFindings = findings, + relatedFindings = findingToDocPairs.map { it.first }, error = monitorResult.error ?: triggerResult.error ) val alerts = mutableListOf() - findingDocPairs.forEach { + triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), listOf(it.second), @@ -500,47 +512,65 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docLevelQueries: List, - matchingDocId: String, + docsToQueries: MutableMap>, + idQueryMap: Map, shouldCreateFinding: Boolean, workflowExecutionId: String? = null, - ): String { - // Before the "|" is the doc id and after the "|" is the index - val docIndex = matchingDocId.split("|") + ): List> { - val finding = Finding( - id = UUID.randomUUID().toString(), - relatedDocIds = listOf(docIndex[0]), - correlatedDocIds = listOf(docIndex[0]), - monitorId = monitor.id, - monitorName = monitor.name, - index = docIndex[1], - docLevelQueries = docLevelQueries, - timestamp = Instant.now(), - executionId = workflowExecutionId - ) + val findingDocPairs = mutableListOf>() + val findings = mutableListOf() + val indexRequests = mutableListOf() - val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - logger.debug("Findings: $findingStr") + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - if (shouldCreateFinding) { - val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) + // Before the "|" is the doc id and after the "|" is the index + val docIndex = it.key.split("|") - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.index(indexRequest, it) + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = listOf(docIndex[0]), + correlatedDocIds = listOf(docIndex[0]), + monitorId = monitor.id, + monitorName = monitor.name, + index = docIndex[1], + docLevelQueries = triggeredQueries, + timestamp = Instant.now(), + executionId = workflowExecutionId + ) + findingDocPairs.add(Pair(finding.id, it.key)) + findings.add(finding) + + val findingStr = + finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) + .string() + logger.debug("Findings: $findingStr") + + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) + .opType(DocWriteRequest.OpType.CREATE) } } - try { - publishFinding(monitor, monitorCtx, finding) - } catch (e: Exception) { - // suppress exception + if (indexRequests.isNotEmpty()) { + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { + bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + } + } + } else { + logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") + } } - return finding.id + return findingDocPairs } private fun publishFinding( @@ -563,13 +593,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun updateLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, - index: String + index: String, + nonPercolateSearchesTimeTaken: AtomicLong ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) + val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken) updatedLastRunContext[shard] = maxSeqNo.toString() } return updatedLastRunContext @@ -606,7 +637,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * Get the current max seq number of the shard. We find it by searching the last document * in the primary shard. */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { + private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long { val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -625,7 +656,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } if (response.hits.hits.isEmpty()) return -1L - + nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) return response.hits.hits[0].seqNo } @@ -654,7 +685,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsSizeInBytes: AtomicLong, monitorInputIndices: List, concreteIndices: List, - fieldsToBeQueried: List + fieldsToBeQueried: List, + nonPercolateSearchesTimeTaken: AtomicLong, + percolateQueriesTimeTaken: AtomicLong, + totalDocsQueried: AtomicLong ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -678,7 +712,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { to, null, docIds, - fieldsToBeQueried + fieldsToBeQueried, + nonPercolateSearchesTimeTaken ) from = to + 1 numDocsLeftToQueryFromShard -= 10000 @@ -706,7 +741,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorInputIndices, concreteIndices, inputRunResults, - docsToQueries + docsToQueries, + percolateQueriesTimeTaken, + totalDocsQueried ) } logger.error( @@ -743,6 +780,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices: List, inputRunResults: MutableMap>, docsToQueries: MutableMap>, + percolateQueriesTimeTaken: AtomicLong, + totalDocsQueried: AtomicLong ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( @@ -751,7 +790,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor, monitorMetadata, concreteIndices, - monitorInputIndices + monitorInputIndices, + percolateQueriesTimeTaken ) percolateQueryResponseHits.forEach { hit -> @@ -765,6 +805,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } } + totalDocsQueried.getAndAdd(transformedDocs.size.toLong()) } finally { // no catch block because exception is caught and handled in runMonitor() class transformedDocs.clear() docsSizeInBytes.set(0) @@ -783,6 +824,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { query: String?, docIds: List? = null, fieldsToFetch: List, + nonPercolateSearchesTimeTaken: AtomicLong, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -819,6 +861,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } + nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) return response.hits } @@ -830,6 +873,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, concreteIndices: List, monitorInputIndices: List, + percolateQueriesTimeTaken: AtomicLong ): SearchHits { val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) @@ -881,8 +925,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { "Response status is ${response.status()}" ) } - logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}") - logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query hits = ${response.hits}") + logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query response = $response") + percolateQueriesTimeTaken.getAndAdd(response.took.millis) return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ @@ -907,7 +952,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val sourceMap = if (hit.hasSource()) { hit.sourceAsMap } else { - logger.error("PERF_DEBUG:Building percolate query source docs from relevant fields only") constructSourceMapFromFieldsInHit(hit) } transformDocumentFieldNames( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 49743b3f0..12e0c481b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -189,6 +189,18 @@ class TransportIndexMonitorAction @Inject constructor( else (it as DocLevelMonitorInput).indices indices.addAll(inputIndices) } + if ( + indices.size == 1 && ( + IndexUtils.isAlias(indices[0], clusterService.state()) || + IndexUtils.isDataStream(indices[0], clusterService.state()) + ) + ) { + val metadata = clusterService.state().metadata.indicesLookup[indices[0]]?.writeIndex + if (metadata != null) { + indices.removeAt(0) + indices.add(metadata.index.name) + } + } val searchRequest = SearchRequest().indices(*indices.toTypedArray()) .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) client.search(