Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize sequence number calculation and reduce search requests in doc level monitor execution #1445

Merged
merged 6 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
Expand Down Expand Up @@ -323,6 +324,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
Expand All @@ -30,7 +30,6 @@ import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
Expand Down Expand Up @@ -59,6 +58,8 @@ import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indices.IndexClosedException
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
Expand Down Expand Up @@ -207,7 +208,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName,
Expand Down Expand Up @@ -255,25 +256,29 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
"${fieldsToBeQueried.joinToString()} instead of entire _source of documents"
)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
docExecutionContext,
val indexExecutionContext = IndexExecutionContext(
queries,
indexLastRunContext,
indexUpdatedRunContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
indexExecutionContext,
monitorMetadata,
inputRunResults,
docsToQueries,
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
)
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
}
}
/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
Expand Down Expand Up @@ -615,7 +620,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
}

private suspend fun updateLastRunContext(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we do not need suspend anymore? I recall we needed it before

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we needed to do suspend earlier because we had a search call to calculate max seq_no being made within the method.
we have removed the search call as we calculate the max seq_no in the first pass. hence no need of a suspend

private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
Expand All @@ -624,8 +629,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString()
}
return updatedLastRunContext
}
Expand Down Expand Up @@ -657,33 +661,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(QueryBuilders.matchAllQuery())
.size(1)
)
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
nonPercolateSearchesTimeTakenStat += response.took.millis
if (response.hits.hits.isEmpty())
return -1L

return response.hits.hits[0].seqNo
}

private fun getShardsCount(clusterService: ClusterService, index: String): Int {
val allShards: List<ShardRouting> = clusterService!!.state().routingTable().allShards(index)
return allShards.filter { it.primary() }.size
Expand All @@ -697,51 +674,79 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun fetchShardDataAndMaybeExecutePercolateQueries(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
indexName: String,
concreteIndexName: String,
conflictingFields: List<String>,
docIds: List<String>? = null,
indexExecutionCtx: IndexExecutionContext,
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>,
updateLastRunContext: (String, String) -> Unit
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()
try {
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()

val hits: SearchHits = searchShard(
monitorCtx,
concreteIndexName,
shard,
prevSeqNo,
maxSeqNo,
docIds,
fieldsToBeQueried
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexName,
concreteIndexName,
monitor.id,
conflictingFields,
val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED
var to: Long = Long.MAX_VALUE
while (to >= from) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this while logic? Also it seems like we reduce the to by 10000 from the current fetched seqNo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take an example of one shard
in current iteration: previous seq_no = 10000, current max seq_no= 40000

first iteration of search shard(which also determines the max sequence number) : TO=int_max, FROM=prev_seq_no : calculate max_seq number
second iteration : TO=max_seq_no-10000 FROM=prev_seq_no
third iteration : TO=TO-10000 FROM=prev_seq_no

in first iteration we get first 10k results and also calculate the max sequence number
in second iteration we reduce (TO) variable to maxSeqNo -10000
and query again and we continue this loop until we have seen all docs up untill the previous sequence number

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made a change to set TO variable of next iteration as 1 less than least seq no from list of docs fetched in current search

val hits: SearchHits = searchShard(
monitorCtx,
indexExecutionCtx.concreteIndexName,
shard,
from,
to,
indexExecutionCtx.docIds,
fieldsToBeQueried,
)
)
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
if (hits.hits.isEmpty()) {
if (to == Long.MAX_VALUE) {
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) // didn't find any docs
}
break
}
if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed
updateLastRunContext(shard, hits.hits[0].seqNo.toString())
}
val leastSeqNoFromHits = hits.hits.last().seqNo
to = leastSeqNoFromHits - 1
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexExecutionCtx.indexName,
indexExecutionCtx.concreteIndexName,
monitor.id,
indexExecutionCtx.conflictingFields,
)
)
if (
transformedDocs.isNotEmpty() &&
shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx)
) {
performPercolateQueryAndResetCounters(
monitorCtx,
monitor,
monitorMetadata,
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries,
)
}
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
}
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
" Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}",
"Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " +
"Error: ${e.message}",
e
)
if (e is IndexClosedException) {
throw e
}
}
if (
transformedDocs.isNotEmpty() &&
Expand Down Expand Up @@ -833,8 +838,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(boolQueryBuilder)
.size(10000)
.size(monitorCtx.docLevelMonitorShardFetchSize)
)
.preference(Preference.PRIMARY_FIRST.type())

Expand All @@ -846,7 +853,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
logger.error("Failed search shard. Response: $response")
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
nonPercolateSearchesTimeTakenStat += response.took.millis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ data class MonitorRunnerExecutionContext(
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
@Volatile var docLevelMonitorShardFetchSize: Int =
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
Expand Down Expand Up @@ -202,6 +203,13 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it
}

monitorCtx.docLevelMonitorShardFetchSize =
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings
.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE) {
monitorCtx.docLevelMonitorShardFetchSize = it
}

return this
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.model

import org.opensearch.commons.alerting.model.DocLevelQuery

/** DTO that contains all the necessary context for fetching data from shard and performing percolate queries */
data class IndexExecutionContext(
val queries: List<DocLevelQuery>,
val lastRunContext: MutableMap<String, Any>,
val updatedLastRunContext: MutableMap<String, Any>,
val indexName: String,
val concreteIndexName: String,
val conflictingFields: List<String>,
val docIds: List<String>? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class AlertingSettings {
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000
const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10
const val DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = 10000

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand All @@ -38,6 +39,16 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Purely a setting used to verify seq_no calculation
*/
val DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = Setting.intSetting(
"plugins.alerting.monitor.doc_level_monitor_shard_fetch_size",
DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
1,
10000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document
* level monitor execution. The docs are being collected from searching on shards of indices mentioned in the
* monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,15 @@ class TransportIndexMonitorAction @Inject constructor(
else (it as DocLevelMonitorInput).indices
indices.addAll(inputIndices)
}
val searchRequest = SearchRequest().indices(*indices.toTypedArray())
val updatedIndices = indices.map { index ->
if (IndexUtils.isAlias(index, clusterService.state()) || IndexUtils.isDataStream(index, clusterService.state())) {
val metadata = clusterService.state().metadata.indicesLookup[index]?.writeIndex
metadata?.index?.name ?: index
} else {
index
}
}
val searchRequest = SearchRequest().indices(*updatedIndices.toTypedArray())
.source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery()))
client.search(
searchRequest,
Expand Down
Loading
Loading