Skip to content

Commit

Permalink
optimize sequence number calculation and reduce search requests by n …
Browse files Browse the repository at this point in the history
…where n is number of shards being queried in the executino

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 27, 2024
1 parent d93c163 commit bafbaa8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
Expand All @@ -30,7 +30,6 @@ import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
Expand Down Expand Up @@ -59,6 +58,8 @@ import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indices.IndexClosedException
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
Expand Down Expand Up @@ -207,7 +208,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName,
Expand Down Expand Up @@ -255,25 +256,29 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
"${fieldsToBeQueried.joinToString()} instead of entire _source of documents"
)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
docExecutionContext,
val indexExecutionContext = IndexExecutionContext(
queries,
indexLastRunContext,
indexUpdatedRunContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
indexExecutionContext,
monitorMetadata,
inputRunResults,
docsToQueries,
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
)
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
}
}
/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
Expand Down Expand Up @@ -615,7 +620,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
}

private suspend fun updateLastRunContext(
private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
Expand All @@ -624,8 +629,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString()
}
return updatedLastRunContext
}
Expand Down Expand Up @@ -657,33 +661,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(QueryBuilders.matchAllQuery())
.size(1)
)
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
nonPercolateSearchesTimeTakenStat += response.took.millis
if (response.hits.hits.isEmpty())
return -1L

return response.hits.hits[0].seqNo
}

private fun getShardsCount(clusterService: ClusterService, index: String): Int {
val allShards: List<ShardRouting> = clusterService!!.state().routingTable().allShards(index)
return allShards.filter { it.primary() }.size
Expand All @@ -697,51 +674,79 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun fetchShardDataAndMaybeExecutePercolateQueries(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
indexName: String,
concreteIndexName: String,
conflictingFields: List<String>,
docIds: List<String>? = null,
indexExecutionCtx: IndexExecutionContext,
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>,
updateLastRunContext: (String, String) -> Unit
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()
try {
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitorCtx,
indexExecutionCtx.concreteIndexName,
shard,
from,
to,
indexExecutionCtx.docIds,
fieldsToBeQueried,
)
if (hits.hits.isEmpty()) {
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString())
break
}
if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed

val hits: SearchHits = searchShard(
monitorCtx,
concreteIndexName,
shard,
prevSeqNo,
maxSeqNo,
docIds,
fieldsToBeQueried
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexName,
concreteIndexName,
monitor.id,
conflictingFields,
updateLastRunContext(shard, hits.hits[0].seqNo.toString())
to = hits.hits[0].seqNo - 10000L
} else {
to -= 10000L
}
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexExecutionCtx.indexName,
indexExecutionCtx.concreteIndexName,
monitor.id,
indexExecutionCtx.conflictingFields,
)
)
)
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
if (
transformedDocs.isNotEmpty() &&
shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx)
) {
performPercolateQueryAndResetCounters(
monitorCtx,
monitor,
monitorMetadata,
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries,
)
}
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
}
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
" Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}",
"Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " +
"Error: ${e.message}",
e
)
if (e is IndexClosedException) {
throw e
}
}
if (
transformedDocs.isNotEmpty() &&
Expand Down Expand Up @@ -833,6 +838,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(boolQueryBuilder)
.size(10000)
)
Expand All @@ -846,7 +853,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
logger.error("Failed search shard. Response: $response")
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
nonPercolateSearchesTimeTakenStat += response.took.millis
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.model

import org.opensearch.commons.alerting.model.DocLevelQuery

data class IndexExecutionContext(
val queries: List<DocLevelQuery>,
val lastRunContext: MutableMap<String, Any>,
val updatedLastRunContext: MutableMap<String, Any>,
val indexName: String,
val concreteIndexName: String,
val conflictingFields: List<String>,
val docIds: List<String>? = null
)

0 comments on commit bafbaa8

Please sign in to comment.