Skip to content

Commit

Permalink
refactor doc level monitor to perform a percolate queru for docs from…
Browse files Browse the repository at this point in the history
… each shard instead of performing one percolate query on docs from all shards

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Dec 20, 2023
1 parent 903a6d9 commit 99b38e2
Showing 1 changed file with 98 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
Expand Down Expand Up @@ -219,39 +220,18 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

val matchingDocs = getMatchingDocs(
fetchDataAndExecutePercolateQueriesPerShard(
monitor,
monitorCtx,
docExecutionContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName)
matchingDocIdsPerIndex?.get(concreteIndexName),
monitorMetadata,
inputRunResults,
docsToQueries
)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
updatedIndexName,
concreteIndexName
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id
.replace("_${updatedIndexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${matchingDocs[idx].first}|$concreteIndexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
}
}
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
Expand Down Expand Up @@ -598,43 +578,86 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return allShards.filter { it.primary() }.size
}

private suspend fun getMatchingDocs(
/** 1. Fetch data per shard for given index. (only 10000 docs are fetched. needs to be converted to scroll if performant enough)
* 2. Transform documents to submit to percolate query
* 3. perform percolate queries
* 4. update docToQueries Map with all hits from percolate queries
* */
private suspend fun fetchDataAndExecutePercolateQueriesPerShard(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String,
concreteIndex: String,
indexName: String,
concreteIndexName: String,
conflictingFields: List<String>,
docIds: List<String>? = null
): List<Pair<String, BytesReference>> {
docIds: List<String>? = null,
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
for (i: Int in 0 until count) {
val transformedDocs = mutableListOf<Pair<String, BytesReference>>()
val shard = i.toString()
try {
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()

val hits: SearchHits = searchShard(
monitorCtx,
concreteIndex,
concreteIndexName,
shard,
prevSeqNo,
maxSeqNo,
null,
docIds
)
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexName,
concreteIndexName,
monitor.id,
conflictingFields
)
)
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
" Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}",
e
)
}

if (transformedDocs.isNotEmpty()) {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
monitorCtx,
transformedDocs.map { it.second },
monitor,
monitorMetadata,
indexName,
concreteIndexName
)

if (hits.hits.isNotEmpty()) {
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
percolateQueryResponseHits.forEach { hit ->
val id = hit.id
.replace("_${indexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${transformedDocs[idx].first}|$concreteIndexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
} catch (e: Exception) {
logger.warn("Failed to run for shard $shard. Error: ${e.message}")
}
}
return matchingDocs
}

/** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo.
* This method hence fetches only docs from shard which haven't been queried before
*/
private suspend fun searchShard(
monitorCtx: MonitorRunnerExecutionContext,
index: String,
Expand Down Expand Up @@ -665,23 +688,24 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
SearchSourceBuilder()
.version(true)
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
.size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k
)
.preference(Preference.PRIMARY_FIRST.type())
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
return response.hits
}

private suspend fun getMatchedQueries(
/** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/
private suspend fun runPercolateQueryOnTransformedDocs(
monitorCtx: MonitorRunnerExecutionContext,
docs: List<BytesReference>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String,
concreteIndex: String
concreteIndex: String,
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND))

Expand All @@ -704,50 +728,58 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)

logger.debug("Monitor ${monitor.id}: Executing percolate query for docs from source index $index against query index $queryIndex")
var response: SearchResponse
try {
response = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
}
} catch (e: Exception) {
throw IllegalStateException(
"Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e
"Monitor ${monitor.id}: Failed to run percolate search for sourceIndex [$index] " +
"and queryIndex [$queryIndex] for ${docs.size} document(s)", e
)
}

if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search percolate index: $queryIndex")
throw IOException(
"Monitor ${monitor.id}: Failed to search percolate index: $queryIndex. Response status is ${response.status()}"
)
}
return response.hits
}

private fun getAllDocs(
/** Transform field names and index names in all the search hits to format required to run percolate search against them.
* Hits are transformed using method transformDocumentFieldNames() */
private fun transformSearchHitsAndReconstructDocs(
hits: SearchHits,
index: String,
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>
conflictingFields: List<String>,
): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

transformDocumentFieldNames(
sourceMap,
conflictingFields,
"_${index}_$monitorId",
"_${concreteIndex}_$monitorId",
""
)

var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)

val sourceRef = BytesReference.bytes(xContentBuilder)

logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString())

Pair(hit.id, sourceRef)
}
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, BytesReference>? {
try {
val sourceMap = hit.sourceAsMap
transformDocumentFieldNames(
sourceMap,
conflictingFields,
"_${index}_$monitorId",
"_${concreteIndex}_$monitorId",
""
)
var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)
val sourceRef = BytesReference.bytes(xContentBuilder)
logger.debug(
"Monitor $monitorId: Document [${hit.id}] payload after transform for percolate query: ${sourceRef.utf8ToString()}",
)
return Pair(hit.id, sourceRef)
} catch (e: Exception) {
logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e)
// skip any document which we fail to transform because we anyway won't be able to run percolate queries on them.
return null
}
})
}

/**
Expand Down

0 comments on commit 99b38e2

Please sign in to comment.