Skip to content

Commit

Permalink
perform percolate query only if threshold setting breached or at the …
Browse files Browse the repository at this point in the history
…end of collecting data from all indices

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jan 4, 2024
1 parent 8622df2 commit cb92f54
Showing 1 changed file with 89 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import java.io.IOException
import java.time.Instant
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import java.util.stream.Collectors
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand Down Expand Up @@ -153,12 +154,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

/* Contains list of docs source that in memory to submit to percolate query against query index.
/* Contains list of docs source that are held in memory to submit to percolate query against query index.
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, BytesReference>>()
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()
val docsSizeInBytes = AtomicLong(0)
var lastUpdatedIndexName: String? = null
var lastConcreteIndexName: String? = null
val concreteIndicesSeenSoFar = mutableListOf<String>()
docLevelMonitorInput.indices.forEach { indexName ->

var concreteIndices = IndexUtils.resolveAllIndices(
Expand All @@ -181,16 +181,17 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
}
}
concreteIndicesSeenSoFar.addAll(concreteIndices)
val updatedIndexName = indexName.replace("*", "_")
lastUpdatedIndexName = updatedIndexName
// lastUpdatedIndexName = updatedIndexName
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
concreteIndices
)

concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
lastConcreteIndexName = concreteIndexName
// lastConcreteIndexName = concreteIndexName
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
Expand Down Expand Up @@ -232,7 +233,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

fetchDataAndExecutePercolateQueriesPerShard(
fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
docExecutionContext,
Expand All @@ -244,10 +245,27 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
inputRunResults,
docsToQueries,
transformedDocs,
docsSizeInBytes
docsSizeInBytes,
docLevelMonitorInput.indices,
concreteIndicesSeenSoFar
)
}
}
/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
the percolate query at the end*/
if (transformedDocs.isNotEmpty()) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
docLevelMonitorInput.indices,
concreteIndicesSeenSoFar,
inputRunResults,
docsToQueries
)
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))

/*
Expand Down Expand Up @@ -593,12 +611,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return allShards.filter { it.primary() }.size
}

/** 1. Fetch data per shard for given index. (only 10000 docs are fetched. needs to be converted to scroll if performant enough)
* 2. Transform documents to submit to percolate query
* 3. perform percolate queries
* 4. update docToQueries Map with all hits from percolate queries
* */
private suspend fun fetchDataAndExecutePercolateQueriesPerShard(
/** 1. Fetch data per shard for given index. (only 10000 docs are fetched.
* needs to be converted to scroll if not performant enough)
* 2. Transform documents to conform to format required for percolate query
* 3a. Check if docs in memory are crossing threshold defined by setting.
* 3b. If yes, perform percolate query and update docToQueries Map with all hits from percolate queries */
private suspend fun fetchShardDataAndMaybeExecutePercolateQueries(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
Expand All @@ -609,8 +627,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
transformedDocs: MutableList<Pair<String, BytesReference>>,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
docsSizeInBytes: AtomicLong,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand Down Expand Up @@ -645,66 +665,50 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
e
)
}
if (transformedDocs.isNotEmpty() || isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) {
if (transformedDocs.isNotEmpty() && isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
indexName,
concreteIndexName,
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries
)
}
}
/* if all shards are covered still in-memory docs size limit is not breached we would need to submit
the percolate query at the end*/
if (transformedDocs.isNotEmpty()) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
indexName,
concreteIndexName,
inputRunResults,
docsToQueries
)
}
}

private suspend fun performPercolateQueryAndResetCounters(
monitorCtx: MonitorRunnerExecutionContext,
transformedDocs: MutableList<Pair<String, BytesReference>>,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
docsSizeInBytes: AtomicLong,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
indexName: String,
concreteIndexName: String,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
) {
try {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
monitorCtx,
transformedDocs.map { it.second },
transformedDocs,
monitor,
monitorMetadata,
indexName,
concreteIndexName
concreteIndices,
monitorInputIndices
)

percolateQueryResponseHits.forEach { hit ->
val id = hit.id
.replace("_${indexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

var id = hit.id
concreteIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") }
monitorInputIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") }
val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${transformedDocs[idx].first}|$concreteIndexName"
val docIndex = "${transformedDocs[idx].first}|${transformedDocs[idx].second.concreteIndexName}"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
Expand Down Expand Up @@ -761,50 +765,60 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
/** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/
private suspend fun runPercolateQueryOnTransformedDocs(
monitorCtx: MonitorRunnerExecutionContext,
docs: List<BytesReference>,
docs: MutableList<Pair<String, TransformedDocDto>>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String,
concreteIndex: String,
concreteIndices: List<String>,
monitorInputIndices: List<String>,
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND))

val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON)
val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList())
val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.termsQuery("index", indices))
val percolateQueryBuilder =
PercolateQueryBuilderExt("query", docs.map { it.second.docSource }, XContentType.JSON)
if (monitor.id.isNotEmpty()) {
boolQueryBuilder.must(QueryBuilders.matchQuery("monitor_id", monitor.id).operator(Operator.AND))
}
boolQueryBuilder.filter(percolateQueryBuilder)

val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
if (queryIndex == null) {
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
" sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}"
val queryIndices =
docs.map { monitorMetadata.sourceToQueryIndexMapping[it.second.indexName + monitor.id] }.distinct()
if (queryIndices.isEmpty()) {
val message =
"Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" +
" sourceIndices: $monitorInputIndices"
logger.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
}
val searchRequest = SearchRequest(queryIndex).preference(Preference.PRIMARY_FIRST.type())

val searchRequest =
SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type())
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
logger.debug("Monitor ${monitor.id}: Executing percolate query for docs from source index $index against query index $queryIndex")
logger.debug(
"Monitor ${monitor.id}: " +
"Executing percolate query for docs from source indices " +
"$monitorInputIndices against query index $queryIndices"
)
var response: SearchResponse
try {
response = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
}
} catch (e: Exception) {
throw IllegalStateException(
"Monitor ${monitor.id}: Failed to run percolate search for sourceIndex [$index] " +
"and queryIndex [$queryIndex] for ${docs.size} document(s)",
"Monitor ${monitor.id}:" +
" Failed to run percolate search for sourceIndex [${concreteIndices.joinToString()}] " +
"and queryIndex [${queryIndices.joinToString()}] for ${docs.size} document(s)",
e
)
}

if (response.status() !== RestStatus.OK) {
throw IOException(
"Monitor ${monitor.id}: Failed to search percolate index: $queryIndex. Response status is ${response.status()}"
"Monitor ${monitor.id}: Failed to search percolate index: ${queryIndices.joinToString()}. " +
"Response status is ${response.status()}"
)
}
return response.hits
Expand All @@ -819,8 +833,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorId: String,
conflictingFields: List<String>,
docsSizeInBytes: AtomicLong,
): List<Pair<String, BytesReference>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, BytesReference>? {
): List<Pair<String, TransformedDocDto>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, TransformedDocDto>? {
try {
val sourceMap = hit.sourceAsMap
transformDocumentFieldNames(
Expand All @@ -833,7 +847,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)
val sourceRef = BytesReference.bytes(xContentBuilder)
docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed())
return Pair(hit.id, sourceRef)
return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef))
} catch (e: Exception) {
logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e)
// skip any document which we fail to transform because we anyway won't be able to run percolate queries on them.
Expand Down Expand Up @@ -903,9 +917,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (thresholdPercentage > 100 || thresholdPercentage < 0) {
thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.getDefault(monitorCtx.settings)
}
val heapMaxBytes = JvmStats.jvmStats().mem.heapMax.bytes
val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes
val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes

return docsBytesSize > thresholdBytes
}

/**
* POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name
* and doc source. A list of these POJOs would be passed to percolate query execution logic.
*/
private data class TransformedDocDto(
var indexName: String,
var concreteIndexName: String,
var docId: String,
var docSource: BytesReference
)
}

0 comments on commit cb92f54

Please sign in to comment.