Skip to content

Commit

Permalink
optimize on compute max seq nos and remove all redundant search requests
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 8, 2024
1 parent d324f8d commit da6afbc
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
Expand Down Expand Up @@ -60,7 +60,8 @@ import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indices.IndexClosedException
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
Expand Down Expand Up @@ -91,6 +92,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
var nonPercolateSearchesTimeTaken = AtomicLong(0)
var percolateQueriesTimeTaken = AtomicLong(0)
var totalDocsQueried = AtomicLong(0)
var docTransformTimeTaken = AtomicLong(0)
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
Expand Down Expand Up @@ -209,11 +211,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName,
nonPercolateSearchesTimeTaken
concreteIndexName
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
Expand Down Expand Up @@ -251,18 +252,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
fieldsToBeQueried.addAll(it.queryFieldNames)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
docExecutionContext,
logger.error("PERF_DEBUG: Monitor ${monitor.id} Query field names: ${fieldsToBeQueried.joinToString()}}")
val indexExecutionContext = IndexExecutionContext(
queries,
indexLastRunContext,
indexUpdatedRunContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
indexExecutionContext,
monitorMetadata,
inputRunResults,
docsToQueries,
Expand All @@ -273,8 +277,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
ArrayList(fieldsToBeQueried),
nonPercolateSearchesTimeTaken,
percolateQueriesTimeTaken,
totalDocsQueried
)
totalDocsQueried,
docTransformTimeTaken
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
}
}
/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
Expand All @@ -295,7 +302,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
}
val took = System.currentTimeMillis() - queryingStartTimeMillis
logger.error("PERF_DEBUG: Entire query+percolate completed in $took millis in $executionId")
logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in $executionId")
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))

/*
Expand Down Expand Up @@ -376,6 +383,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
"Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken"
)
logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on percolate queries in millis: $percolateQueriesTimeTaken")
logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on transforming doc fields in millis: $docTransformTimeTaken")
logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Num docs queried: $totalDocsQueried")
}
}
Expand Down Expand Up @@ -590,18 +598,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
}

private suspend fun updateLastRunContext(
private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
nonPercolateSearchesTimeTaken: AtomicLong
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken)
updatedLastRunContext[shard] = maxSeqNo.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString()
}
return updatedLastRunContext
}
Expand Down Expand Up @@ -650,7 +656,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.size(1)
)
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
JvmStats.jvmStats()
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
Expand All @@ -673,11 +678,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun fetchShardDataAndMaybeExecutePercolateQueries(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
indexName: String,
concreteIndexName: String,
conflictingFields: List<String>,
docIds: List<String>? = null,
indexExecutionCtx: IndexExecutionContext,
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
Expand All @@ -688,43 +689,47 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
fieldsToBeQueried: List<String>,
nonPercolateSearchesTimeTaken: AtomicLong,
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong
totalDocsQueried: AtomicLong,
docTransformTimeTake: AtomicLong,
updateLastRunContext: (String, String) -> Unit
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()
try {
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
continue
}
var from = prevSeqNo ?: 0
var numDocsLeftToQueryFromShard = maxSeqNo - from

while (numDocsLeftToQueryFromShard > 0) {
val to = from + if (numDocsLeftToQueryFromShard < 10000) numDocsLeftToQueryFromShard else 10000
val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitorCtx,
concreteIndexName,
indexExecutionCtx.concreteIndexName,
shard,
from,
to,
null,
docIds,
indexExecutionCtx.docIds,
fieldsToBeQueried,
nonPercolateSearchesTimeTaken
)
from = to + 1
numDocsLeftToQueryFromShard -= 10000
val startTime = Instant.now()
if (hits.hits.isEmpty()) {
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString())
break
}
if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed

updateLastRunContext(shard, hits.hits[0].seqNo.toString())
to = hits.hits[0].seqNo - 10000L
} else {
to -= 10000L
}
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexName,
concreteIndexName,
indexExecutionCtx.indexName,
indexExecutionCtx.concreteIndexName,
monitor.id,
conflictingFields,
indexExecutionCtx.conflictingFields,
docsSizeInBytes
)
)
Expand All @@ -746,17 +751,18 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
totalDocsQueried
)
}
logger.error(
"PERF_DEBUG: Transforming docs of shard [$indexName][$shard] " +
"took ${Instant.now().epochSecond - startTime.epochSecond}"
)
docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime)
}
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
" Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}",
"Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " +
"Error: ${e.message}",
e
)
if (e is IndexClosedException) {
throw e
}
}
}
}
Expand Down Expand Up @@ -821,7 +827,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
shard: String,
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null,
fieldsToFetch: List<String>,
nonPercolateSearchesTimeTaken: AtomicLong,
Expand All @@ -832,10 +837,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (query != null) {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
}

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
Expand All @@ -846,12 +847,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(boolQueryBuilder)
.size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k
)

if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) {
logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}")
request.source().fetchSource(false)
for (field in fieldsToFetch) {
request.source().fetchField(field)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.model

import org.opensearch.commons.alerting.model.DocLevelQuery

data class IndexExecutionContext(
val queries: List<DocLevelQuery>,
val lastRunContext: MutableMap<String, Any>,
val updatedLastRunContext: MutableMap<String, Any>,
val indexName: String,
val concreteIndexName: String,
val conflictingFields: List<String>,
val docIds: List<String>? = null
)

0 comments on commit da6afbc

Please sign in to comment.