From 8b07c179d6df644c1744946514105ad216bb311e Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 27 Feb 2024 02:02:43 -0800 Subject: [PATCH] optimize sequence number calculation and reduce search requests by n where n is number of shards being queried in the executino Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 148 +++++++++--------- .../model/DocumentExecutionContext.kt | 14 -- .../alerting/model/IndexExecutionContext.kt | 18 +++ 3 files changed, 95 insertions(+), 85 deletions(-) delete mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index b23e81355..e3f6f1b68 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -17,8 +17,8 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse -import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult @@ -30,7 +30,6 @@ import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.workflow.WorkflowRunContext -import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.Preference @@ -59,6 +58,8 @@ import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indices.IndexClosedException import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits @@ -207,7 +208,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( + val indexUpdatedRunContext = initializeNewLastRunContext( indexLastRunContext.toMutableMap(), monitorCtx, concreteIndexName, @@ -255,25 +256,29 @@ class DocumentLevelMonitorRunner : MonitorRunner() { "${fieldsToBeQueried.joinToString()} instead of entire _source of documents" ) } - - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - - fetchShardDataAndMaybeExecutePercolateQueries( - monitor, - monitorCtx, - docExecutionContext, + val indexExecutionContext = IndexExecutionContext( + queries, + indexLastRunContext, + indexUpdatedRunContext, updatedIndexName, concreteIndexName, conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), + ) + + fetchShardDataAndMaybeExecutePercolateQueries( + monitor, + monitorCtx, + indexExecutionContext, monitorMetadata, inputRunResults, docsToQueries, updatedIndexNames, concreteIndicesSeenSoFar, ArrayList(fieldsToBeQueried) - ) + ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number + indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo + } } } /* if all indices are covered still in-memory docs size limit is not breached we would need to submit @@ -612,7 +617,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) } - private suspend fun updateLastRunContext( + private fun initializeNewLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, index: String, @@ -621,8 +626,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) - updatedLastRunContext[shard] = maxSeqNo.toString() + updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString() } return updatedLastRunContext } @@ -654,33 +658,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { return indexCreationDate > lastExecutionTime.toEpochMilli() } - /** - * Get the current max seq number of the shard. We find it by searching the last document - * in the primary shard. - */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { - val request: SearchRequest = SearchRequest() - .indices(index) - .preference("_shards:$shard") - .source( - SearchSourceBuilder() - .version(true) - .sort("_seq_no", SortOrder.DESC) - .seqNoAndPrimaryTerm(true) - .query(QueryBuilders.matchAllQuery()) - .size(1) - ) - val response: SearchResponse = client.suspendUntil { client.search(request, it) } - if (response.status() !== RestStatus.OK) { - throw IOException("Failed to get max seq no for shard: $shard") - } - nonPercolateSearchesTimeTakenStat += response.took.millis - if (response.hits.hits.isEmpty()) - return -1L - - return response.hits.hits[0].seqNo - } - private fun getShardsCount(clusterService: ClusterService, index: String): Int { val allShards: List = clusterService!!.state().routingTable().allShards(index) return allShards.filter { it.primary() }.size @@ -694,51 +671,79 @@ class DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docExecutionCtx: DocumentExecutionContext, - indexName: String, - concreteIndexName: String, - conflictingFields: List, - docIds: List? = null, + indexExecutionCtx: IndexExecutionContext, monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, monitorInputIndices: List, concreteIndices: List, fieldsToBeQueried: List, + updateLastRunContext: (String, String) -> Unit ) { - val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int + val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { val shard = i.toString() try { - val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() - val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED + var to: Long = Long.MAX_VALUE + while (to >= from) { + val hits: SearchHits = searchShard( + monitorCtx, + indexExecutionCtx.concreteIndexName, + shard, + from, + to, + indexExecutionCtx.docIds, + fieldsToBeQueried, + ) + if (hits.hits.isEmpty()) { + updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) + break + } + if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed - val hits: SearchHits = searchShard( - monitorCtx, - concreteIndexName, - shard, - prevSeqNo, - maxSeqNo, - docIds, - fieldsToBeQueried - ) - val startTime = System.currentTimeMillis() - transformedDocs.addAll( - transformSearchHitsAndReconstructDocs( - hits, - indexName, - concreteIndexName, - monitor.id, - conflictingFields, + updateLastRunContext(shard, hits.hits[0].seqNo.toString()) + to = hits.hits[0].seqNo - 10000L + } else { + to -= 10000L + } + val startTime = System.currentTimeMillis() + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexExecutionCtx.indexName, + indexExecutionCtx.concreteIndexName, + monitor.id, + indexExecutionCtx.conflictingFields, + ) ) - ) - docTransformTimeTakenStat += System.currentTimeMillis() - startTime + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx) + ) { + performPercolateQueryAndResetCounters( + monitorCtx, + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries, + ) + } + docTransformTimeTakenStat += System.currentTimeMillis() - startTime + } } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + - " Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}", + "Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " + + "Error: ${e.message}", e ) + if (e is IndexClosedException) { + throw e + } } if ( transformedDocs.isNotEmpty() && @@ -830,6 +835,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { .source( SearchSourceBuilder() .version(true) + .sort("_seq_no", SortOrder.DESC) + .seqNoAndPrimaryTerm(true) .query(boolQueryBuilder) .size(10000) ) @@ -843,7 +850,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { - logger.error("Failed search shard. Response: $response") throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } nonPercolateSearchesTimeTakenStat += response.took.millis diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt deleted file mode 100644 index 0caad1f4a..000000000 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.model - -import org.opensearch.commons.alerting.model.DocLevelQuery - -data class DocumentExecutionContext( - val queries: List, - val lastRunContext: Map, - val updatedLastRunContext: Map -) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt new file mode 100644 index 000000000..97156eb96 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.model.DocLevelQuery + +data class IndexExecutionContext( + val queries: List, + val lastRunContext: MutableMap, + val updatedLastRunContext: MutableMap, + val indexName: String, + val concreteIndexName: String, + val conflictingFields: List, + val docIds: List? = null +)