diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 979a156cd..daeb22945 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -16,8 +16,8 @@ import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest -import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult @@ -60,7 +60,8 @@ import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders -import org.opensearch.monitor.jvm.JvmStats +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indices.IndexClosedException import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits @@ -91,6 +92,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var nonPercolateSearchesTimeTaken = AtomicLong(0) var percolateQueriesTimeTaken = AtomicLong(0) var totalDocsQueried = AtomicLong(0) + var docTransformTimeTaken = AtomicLong(0) try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) @@ -209,11 +211,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( + val indexUpdatedRunContext = initializeNewLastRunContext( indexLastRunContext.toMutableMap(), monitorCtx, - concreteIndexName, - nonPercolateSearchesTimeTaken + concreteIndexName ) as MutableMap if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) @@ -251,18 +252,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } fieldsToBeQueried.addAll(it.queryFieldNames) } - - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - - fetchShardDataAndMaybeExecutePercolateQueries( - monitor, - monitorCtx, - docExecutionContext, + logger.error("PERF_DEBUG: Monitor ${monitor.id} Query field names: ${fieldsToBeQueried.joinToString()}}") + val indexExecutionContext = IndexExecutionContext( + queries, + indexLastRunContext, + indexUpdatedRunContext, updatedIndexName, concreteIndexName, conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), + ) + + fetchShardDataAndMaybeExecutePercolateQueries( + monitor, + monitorCtx, + indexExecutionContext, monitorMetadata, inputRunResults, docsToQueries, @@ -273,8 +277,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ArrayList(fieldsToBeQueried), nonPercolateSearchesTimeTaken, percolateQueriesTimeTaken, - totalDocsQueried - ) + totalDocsQueried, + docTransformTimeTaken + ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number + indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo + } } } /* if all indices are covered still in-memory docs size limit is not breached we would need to submit @@ -295,7 +302,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } val took = System.currentTimeMillis() - queryingStartTimeMillis - logger.error("PERF_DEBUG: Entire query+percolate completed in $took millis in $executionId") + logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in $executionId") monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -376,6 +383,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken" ) logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on percolate queries in millis: $percolateQueriesTimeTaken") + logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on transforming doc fields in millis: $docTransformTimeTaken") logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Num docs queried: $totalDocsQueried") } } @@ -590,18 +598,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } - private suspend fun updateLastRunContext( + private fun initializeNewLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, index: String, - nonPercolateSearchesTimeTaken: AtomicLong ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken) - updatedLastRunContext[shard] = maxSeqNo.toString() + updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString() } return updatedLastRunContext } @@ -650,7 +656,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .size(1) ) val response: SearchResponse = client.suspendUntil { client.search(request, it) } - JvmStats.jvmStats() if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } @@ -673,11 +678,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docExecutionCtx: DocumentExecutionContext, - indexName: String, - concreteIndexName: String, - conflictingFields: List, - docIds: List? = null, + indexExecutionCtx: IndexExecutionContext, monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, @@ -688,43 +689,47 @@ object DocumentLevelMonitorRunner : MonitorRunner() { fieldsToBeQueried: List, nonPercolateSearchesTimeTaken: AtomicLong, percolateQueriesTimeTaken: AtomicLong, - totalDocsQueried: AtomicLong + totalDocsQueried: AtomicLong, + docTransformTimeTake: AtomicLong, + updateLastRunContext: (String, String) -> Unit ) { - val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int + val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { val shard = i.toString() try { - val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() - val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() - if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { - continue - } - var from = prevSeqNo ?: 0 - var numDocsLeftToQueryFromShard = maxSeqNo - from - - while (numDocsLeftToQueryFromShard > 0) { - val to = from + if (numDocsLeftToQueryFromShard < 10000) numDocsLeftToQueryFromShard else 10000 + val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED + var to: Long = Long.MAX_VALUE + while (to >= from) { val hits: SearchHits = searchShard( monitorCtx, - concreteIndexName, + indexExecutionCtx.concreteIndexName, shard, from, to, - null, - docIds, + indexExecutionCtx.docIds, fieldsToBeQueried, nonPercolateSearchesTimeTaken ) - from = to + 1 - numDocsLeftToQueryFromShard -= 10000 - val startTime = Instant.now() + if (hits.hits.isEmpty()) { + updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) + break + } + if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed + + updateLastRunContext(shard, hits.hits[0].seqNo.toString()) + to = hits.hits[0].seqNo - 10000L + } else { + to -= 10000L + } + val startTime = System.currentTimeMillis() transformedDocs.addAll( transformSearchHitsAndReconstructDocs( hits, - indexName, - concreteIndexName, + indexExecutionCtx.indexName, + indexExecutionCtx.concreteIndexName, monitor.id, - conflictingFields, + indexExecutionCtx.conflictingFields, docsSizeInBytes ) ) @@ -746,17 +751,18 @@ object DocumentLevelMonitorRunner : MonitorRunner() { totalDocsQueried ) } - logger.error( - "PERF_DEBUG: Transforming docs of shard [$indexName][$shard] " + - "took ${Instant.now().epochSecond - startTime.epochSecond}" - ) + docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime) } } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + - " Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}", + "Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " + + "Error: ${e.message}", e ) + if (e is IndexClosedException) { + throw e + } } } } @@ -821,7 +827,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String?, docIds: List? = null, fieldsToFetch: List, nonPercolateSearchesTimeTaken: AtomicLong, @@ -832,10 +837,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (query != null) { - boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) - } - if (!docIds.isNullOrEmpty()) { boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } @@ -846,12 +847,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .source( SearchSourceBuilder() .version(true) + .sort("_seq_no", SortOrder.DESC) + .seqNoAndPrimaryTerm(true) .query(boolQueryBuilder) .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k ) if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) { - logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}") request.source().fetchSource(false) for (field in fieldsToFetch) { request.source().fetchField(field) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt deleted file mode 100644 index 0caad1f4a..000000000 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.model - -import org.opensearch.commons.alerting.model.DocLevelQuery - -data class DocumentExecutionContext( - val queries: List, - val lastRunContext: Map, - val updatedLastRunContext: Map -) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt new file mode 100644 index 000000000..97156eb96 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.model.DocLevelQuery + +data class IndexExecutionContext( + val queries: List, + val lastRunContext: MutableMap, + val updatedLastRunContext: MutableMap, + val indexName: String, + val concreteIndexName: String, + val conflictingFields: List, + val docIds: List? = null +)