diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 596f59a43..86fd18ff0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -67,6 +67,7 @@ import java.io.IOException import java.time.Instant import java.util.UUID import java.util.concurrent.atomic.AtomicLong +import java.util.stream.Collectors import kotlin.math.max object DocumentLevelMonitorRunner : MonitorRunner() { @@ -153,12 +154,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex - /* Contains list of docs source that in memory to submit to percolate query against query index. + /* Contains list of docs source that are held in memory to submit to percolate query against query index. * Docs are fetched from the source index per shard and transformed.*/ - val transformedDocs = mutableListOf>() + val transformedDocs = mutableListOf>() val docsSizeInBytes = AtomicLong(0) - var lastUpdatedIndexName: String? = null - var lastConcreteIndexName: String? = null + val concreteIndicesSeenSoFar = mutableListOf() docLevelMonitorInput.indices.forEach { indexName -> var concreteIndices = IndexUtils.resolveAllIndices( @@ -181,8 +181,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } } + concreteIndicesSeenSoFar.addAll(concreteIndices) val updatedIndexName = indexName.replace("*", "_") - lastUpdatedIndexName = updatedIndexName +// lastUpdatedIndexName = updatedIndexName val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( monitorCtx.clusterService!!.state(), concreteIndices @@ -190,7 +191,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices.forEach { concreteIndexName -> // Prepare lastRunContext for each index - lastConcreteIndexName = concreteIndexName +// lastConcreteIndexName = concreteIndexName val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { val isIndexCreatedRecently = createdRecently( monitor, @@ -232,7 +233,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - fetchDataAndExecutePercolateQueriesPerShard( + fetchShardDataAndMaybeExecutePercolateQueries( monitor, monitorCtx, docExecutionContext, @@ -244,10 +245,27 @@ object DocumentLevelMonitorRunner : MonitorRunner() { inputRunResults, docsToQueries, transformedDocs, - docsSizeInBytes + docsSizeInBytes, + docLevelMonitorInput.indices, + concreteIndicesSeenSoFar ) } } + /* if all indices are covered still in-memory docs size limit is not breached we would need to submit + the percolate query at the end*/ + if (transformedDocs.isNotEmpty()) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + docLevelMonitorInput.indices, + concreteIndicesSeenSoFar, + inputRunResults, + docsToQueries + ) + } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -593,12 +611,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return allShards.filter { it.primary() }.size } - /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. needs to be converted to scroll if performant enough) - * 2. Transform documents to submit to percolate query - * 3. perform percolate queries - * 4. update docToQueries Map with all hits from percolate queries - * */ - private suspend fun fetchDataAndExecutePercolateQueriesPerShard( + /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. + * needs to be converted to scroll if not performant enough) + * 2. Transform documents to conform to format required for percolate query + * 3a. Check if docs in memory are crossing threshold defined by setting. + * 3b. If yes, perform percolate query and update docToQueries Map with all hits from percolate queries */ + private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, @@ -609,8 +627,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, - transformedDocs: MutableList>, + transformedDocs: MutableList>, docsSizeInBytes: AtomicLong, + monitorInputIndices: List, + concreteIndices: List, ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -645,66 +665,50 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) } - if (transformedDocs.isNotEmpty() || isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) { + if (transformedDocs.isNotEmpty() && isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) { performPercolateQueryAndResetCounters( monitorCtx, transformedDocs, docsSizeInBytes, monitor, monitorMetadata, - indexName, - concreteIndexName, + monitorInputIndices, + concreteIndices, inputRunResults, docsToQueries ) } } - /* if all shards are covered still in-memory docs size limit is not breached we would need to submit - the percolate query at the end*/ - if (transformedDocs.isNotEmpty()) { - performPercolateQueryAndResetCounters( - monitorCtx, - transformedDocs, - docsSizeInBytes, - monitor, - monitorMetadata, - indexName, - concreteIndexName, - inputRunResults, - docsToQueries - ) - } } private suspend fun performPercolateQueryAndResetCounters( monitorCtx: MonitorRunnerExecutionContext, - transformedDocs: MutableList>, + transformedDocs: MutableList>, docsSizeInBytes: AtomicLong, monitor: Monitor, monitorMetadata: MonitorMetadata, - indexName: String, - concreteIndexName: String, + monitorInputIndices: List, + concreteIndices: List, inputRunResults: MutableMap>, docsToQueries: MutableMap>, ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( monitorCtx, - transformedDocs.map { it.second }, + transformedDocs, monitor, monitorMetadata, - indexName, - concreteIndexName + concreteIndices, + monitorInputIndices ) percolateQueryResponseHits.forEach { hit -> - val id = hit.id - .replace("_${indexName}_${monitor.id}", "") - .replace("_${concreteIndexName}_${monitor.id}", "") - + var id = hit.id + concreteIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + monitorInputIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } docIndices.forEach { idx -> - val docIndex = "${transformedDocs[idx].first}|$concreteIndexName" + val docIndex = "${transformedDocs[idx].first}|${transformedDocs[idx].second.concreteIndexName}" inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } @@ -761,34 +765,42 @@ object DocumentLevelMonitorRunner : MonitorRunner() { /** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/ private suspend fun runPercolateQueryOnTransformedDocs( monitorCtx: MonitorRunnerExecutionContext, - docs: List, + docs: MutableList>, monitor: Monitor, monitorMetadata: MonitorMetadata, - index: String, - concreteIndex: String, + concreteIndices: List, + monitorInputIndices: List, ): SearchHits { - val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) - - val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) + val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) + val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.termsQuery("index", indices)) + val percolateQueryBuilder = + PercolateQueryBuilderExt("query", docs.map { it.second.docSource }, XContentType.JSON) if (monitor.id.isNotEmpty()) { boolQueryBuilder.must(QueryBuilders.matchQuery("monitor_id", monitor.id).operator(Operator.AND)) } boolQueryBuilder.filter(percolateQueryBuilder) - - val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id] - if (queryIndex == null) { - val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" + - " sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}" + val queryIndices = + docs.map { monitorMetadata.sourceToQueryIndexMapping[it.second.indexName + monitor.id] }.distinct() + if (queryIndices.isEmpty()) { + val message = + "Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" + + " sourceIndices: $monitorInputIndices" logger.error(message) throw AlertingException.wrap( OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) } - val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type()) + + val searchRequest = + SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) - logger.debug("Monitor ${monitor.id}: Executing percolate query for docs from source index $index against query index $queryIndex") + logger.debug( + "Monitor ${monitor.id}: " + + "Executing percolate query for docs from source indices " + + "$monitorInputIndices against query index $queryIndices" + ) var response: SearchResponse try { response = monitorCtx.client!!.suspendUntil { @@ -796,15 +808,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } catch (e: Exception) { throw IllegalStateException( - "Monitor ${monitor.id}: Failed to run percolate search for sourceIndex [$index] " + - "and queryIndex [$queryIndex] for ${docs.size} document(s)", + "Monitor ${monitor.id}:" + + " Failed to run percolate search for sourceIndex [${concreteIndices.joinToString()}] " + + "and queryIndex [${queryIndices.joinToString()}] for ${docs.size} document(s)", e ) } if (response.status() !== RestStatus.OK) { throw IOException( - "Monitor ${monitor.id}: Failed to search percolate index: $queryIndex. Response status is ${response.status()}" + "Monitor ${monitor.id}: Failed to search percolate index: ${queryIndices.joinToString()}. " + + "Response status is ${response.status()}" ) } return response.hits @@ -819,8 +833,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorId: String, conflictingFields: List, docsSizeInBytes: AtomicLong, - ): List> { - return hits.mapNotNull(fun(hit: SearchHit): Pair? { + ): List> { + return hits.mapNotNull(fun(hit: SearchHit): Pair? { try { val sourceMap = hit.sourceAsMap transformDocumentFieldNames( @@ -833,7 +847,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) val sourceRef = BytesReference.bytes(xContentBuilder) docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed()) - return Pair(hit.id, sourceRef) + return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef)) } catch (e: Exception) { logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) // skip any document which we fail to transform because we anyway won't be able to run percolate queries on them. @@ -903,9 +917,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (thresholdPercentage > 100 || thresholdPercentage < 0) { thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.getDefault(monitorCtx.settings) } - val heapMaxBytes = JvmStats.jvmStats().mem.heapMax.bytes + val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes return docsBytesSize > thresholdBytes } + + /** + * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name + * and doc source. A list of these POJOs would be passed to percolate query execution logic. + */ + private data class TransformedDocDto( + var indexName: String, + var concreteIndexName: String, + var docId: String, + var docSource: BytesReference + ) }