Skip to content

Commit

Permalink
optimize sequence number calculation and reduce search requests in do…
Browse files Browse the repository at this point in the history
…c level monitor execution (opensearch-project#1445)

* optimize sequence number calculation and reduce search requests by n where n is number of shards being queried in the executino

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix tests

Signed-off-by: Surya Sashank Nistala <[email protected]>

* optimize check indices and execute to query only write index of aliases and datastreams during monitor creation

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix test

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add javadoc

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add tests to verify seq_no calculation

Signed-off-by: Surya Sashank Nistala <[email protected]>

---------

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep authored and engechas committed Mar 18, 2024
1 parent f599956 commit 6ebdbdc
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
Expand Down Expand Up @@ -287,6 +288,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
Expand All @@ -30,7 +30,6 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.ShardRouting
Expand All @@ -56,6 +55,8 @@ import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indices.IndexClosedException
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.rest.RestStatus
import org.opensearch.search.SearchHit
Expand Down Expand Up @@ -199,7 +200,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName,
Expand Down Expand Up @@ -247,24 +248,28 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
"${fieldsToBeQueried.joinToString()} instead of entire _source of documents"
)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)
val indexExecutionContext = IndexExecutionContext(
queries,
indexLastRunContext,
indexUpdatedRunContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
docExecutionContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
indexExecutionContext,
monitorMetadata,
inputRunResults,
docsToQueries,
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
)
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
}
}
/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
Expand Down Expand Up @@ -582,7 +587,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
}

private suspend fun updateLastRunContext(
private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
Expand All @@ -591,8 +596,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString()
}
return updatedLastRunContext
}
Expand Down Expand Up @@ -624,33 +628,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(QueryBuilders.matchAllQuery())
.size(1)
)
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
nonPercolateSearchesTimeTakenStat += response.took.millis
if (response.hits.hits.isEmpty())
return -1L

return response.hits.hits[0].seqNo
}

private fun getShardsCount(clusterService: ClusterService, index: String): Int {
val allShards: List<ShardRouting> = clusterService!!.state().routingTable().allShards(index)
return allShards.filter { it.primary() }.size
Expand All @@ -664,49 +641,78 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun fetchShardDataAndMaybeExecutePercolateQueries(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
indexName: String,
concreteIndexName: String,
conflictingFields: List<String>,
indexExecutionCtx: IndexExecutionContext,
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>,
updateLastRunContext: (String, String) -> Unit
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()
try {
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()

val hits: SearchHits = searchShard(
monitorCtx,
concreteIndexName,
shard,
prevSeqNo,
maxSeqNo,
fieldsToBeQueried
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexName,
concreteIndexName,
monitor.id,
conflictingFields,
val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitorCtx,
indexExecutionCtx.concreteIndexName,
shard,
from,
to,
fieldsToBeQueried,
)
)
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
if (hits.hits.isEmpty()) {
if (to == Long.MAX_VALUE) {
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) // didn't find any docs
}
break
}
if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed
updateLastRunContext(shard, hits.hits[0].seqNo.toString())
}
val leastSeqNoFromHits = hits.hits.last().seqNo
to = leastSeqNoFromHits - 1
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexExecutionCtx.indexName,
indexExecutionCtx.concreteIndexName,
monitor.id,
indexExecutionCtx.conflictingFields,
)
)
if (
transformedDocs.isNotEmpty() &&
shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx)
) {
performPercolateQueryAndResetCounters(
monitorCtx,
monitor,
monitorMetadata,
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries,
)
}
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
}
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
" Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}",
"Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " +
"Error: ${e.message}",
e
)
if (e is IndexClosedException) {
throw e
}
}
if (
transformedDocs.isNotEmpty() &&
Expand Down Expand Up @@ -793,8 +799,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(boolQueryBuilder)
.size(10000)
.size(monitorCtx.docLevelMonitorShardFetchSize)
)

if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) {
Expand All @@ -805,7 +813,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
logger.error("Failed search shard. Response: $response")
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
nonPercolateSearchesTimeTakenStat += response.took.millis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,6 @@ data class MonitorRunnerExecutionContext(
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
@Volatile var docLevelMonitorShardFetchSize: Int =
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
Expand Down Expand Up @@ -189,6 +190,13 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it
}

monitorCtx.docLevelMonitorShardFetchSize =
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings
.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE) {
monitorCtx.docLevelMonitorShardFetchSize = it
}

return this
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.model

import org.opensearch.commons.alerting.model.DocLevelQuery

/** DTO that contains all the necessary context for fetching data from shard and performing percolate queries */
data class IndexExecutionContext(
val queries: List<DocLevelQuery>,
val lastRunContext: MutableMap<String, Any>,
val updatedLastRunContext: MutableMap<String, Any>,
val indexName: String,
val concreteIndexName: String,
val conflictingFields: List<String>,
val docIds: List<String>? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class AlertingSettings {
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000
const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10
const val DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = 10000

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand All @@ -39,6 +40,16 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Purely a setting used to verify seq_no calculation
*/
val DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = Setting.intSetting(
"plugins.alerting.monitor.doc_level_monitor_shard_fetch_size",
DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
1,
10000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document
* level monitor execution. The docs are being collected from searching on shards of indices mentioned in the
* monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,15 @@ class TransportIndexMonitorAction @Inject constructor(
else (it as DocLevelMonitorInput).indices
indices.addAll(inputIndices)
}
val searchRequest = SearchRequest().indices(*indices.toTypedArray())
val updatedIndices = indices.map { index ->
if (IndexUtils.isAlias(index, clusterService.state()) || IndexUtils.isDataStream(index, clusterService.state())) {
val metadata = clusterService.state().metadata.indicesLookup[index]?.writeIndex
metadata?.index?.name ?: index
} else {
index
}
}
val searchRequest = SearchRequest().indices(*updatedIndices.toTypedArray())
.source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery()))
client.search(
searchRequest,
Expand Down
Loading

0 comments on commit 6ebdbdc

Please sign in to comment.