Skip to content

Commit

Permalink
query shard up to max sequence number instead of just 10000
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jan 27, 2024
1 parent 009cd61 commit 743bda8
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
Expand Down Expand Up @@ -162,6 +161,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val docsSizeInBytes = AtomicLong(0)
val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
val queryingStartTimeMillis = System.currentTimeMillis()
docLevelMonitorInput.indices.forEach { indexName ->

var concreteIndices = IndexUtils.resolveAllIndices(
Expand Down Expand Up @@ -284,6 +284,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries
)
}
val took = System.currentTimeMillis() - queryingStartTimeMillis
logger.error("PERF_DEBUG: Entire query+percolate completed in $took millis in $executionId")
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))

/*
Expand All @@ -303,6 +305,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor $monitor.id")
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
Expand Down Expand Up @@ -338,7 +341,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
} else {
onSuccessfulMonitorRun(monitorCtx, monitor)
}

logger.error(
"Calling upsertMetadata function from ${monitorCtx.clusterService!!.localNode().id} in " +
"execution $executionId"
)
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
Expand Down Expand Up @@ -533,7 +539,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
publishFinding(monitor, monitorCtx, finding)
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return finding.id
}
Expand Down Expand Up @@ -657,50 +662,65 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
try {
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
continue
}
var from = prevSeqNo ?: 0
var numDocsLeftToQueryFromShard = maxSeqNo - from

val hits: SearchHits = searchShard(
monitorCtx,
concreteIndexName,
shard,
prevSeqNo,
maxSeqNo,
null,
docIds,
fieldsToBeQueried
)
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexName,
while (numDocsLeftToQueryFromShard > 0) {
val to = from + if (numDocsLeftToQueryFromShard < 10000) numDocsLeftToQueryFromShard else 10000
val hits: SearchHits = searchShard(
monitorCtx,
concreteIndexName,
monitor.id,
conflictingFields,
docsSizeInBytes
shard,
from,
to,
null,
docIds,
fieldsToBeQueried
)
)
from = to + 1
numDocsLeftToQueryFromShard -= 10000
val startTime = Instant.now()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexName,
concreteIndexName,
monitor.id,
conflictingFields,
docsSizeInBytes
)
)
if (
transformedDocs.isNotEmpty() &&
shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx)
) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries
)
}
logger.error(
"PERF_DEBUG: Transforming docs of shard [$indexName][$shard] " +
"took ${Instant.now().epochSecond - startTime.epochSecond}"
)
}
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
" Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}",
e
)
}
if (
transformedDocs.isNotEmpty() &&
shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx)
) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries
)
}
}
}

Expand Down Expand Up @@ -751,7 +771,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

/** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo.
/** Executes search query on given shard of given index to fetch docs with sequence number greater than prevSeqNo.
* This method hence fetches only docs from shard which haven't been queried before
*/
private suspend fun searchShard(
Expand Down Expand Up @@ -787,7 +807,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.query(boolQueryBuilder)
.size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k
)
.preference(Preference.PRIMARY_FIRST.type())

if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) {
logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}")
Expand Down Expand Up @@ -833,7 +852,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

val searchRequest =
SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type())
SearchRequest().indices(*queryIndices.toTypedArray())
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
Expand Down Expand Up @@ -863,6 +882,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
}
logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query hits = ${response.hits}")
return response.hits
}
/** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ object MonitorMetadataService :
primaryTerm = response.primaryTerm
)
} catch (e: Exception) {
log.error("Failed to upsert metadata", e)
throw AlertingException.wrap(e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
}
}
is Monitor -> {
launch {
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
bulkResponse.forEach { bulkItemResponse ->
if (bulkItemResponse.isFailed) {
log.debug(bulkItemResponse.failureMessage)
log.error("Failed to index doc level query for monitor $monitorId due to" + bulkItemResponse.failureMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ object CompositeWorkflowRunner : WorkflowRunner() {
val isTempWorkflow = dryRun || workflow.id == Workflow.NO_ID

val executionId = generateExecutionId(isTempWorkflow, workflow)

logger.debug(
"Workflow ${workflow.id} execution began at $workflowExecutionStartTime" +
" on node ${monitorCtx.clusterService!!.localNode().id}"
)
val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata(
workflow = workflow,
skipIndex = isTempWorkflow,
Expand Down Expand Up @@ -227,6 +230,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
)
}
workflowRunResult.executionEndTime = Instant.now()
logger.debug("Workflow ${workflow.id} execution completed at $workflowRunResult.executionEndTime")
return workflowRunResult
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,13 @@ class JobSweeper(

private fun isOwningNode(shardId: ShardId, jobId: JobId): Boolean {
val localNodeId = clusterService.localNode().id
val shardNodeIds = clusterService.state().routingTable.shardRoutingTable(shardId)
val shardRoutingTable = clusterService.state().routingTable.shardRoutingTable(shardId)
val shardNodeIds = shardRoutingTable
.filter { it.active() }
.map { it.currentNodeId() }
val shardNodes = ShardNodes(localNodeId, shardNodeIds)
return shardNodes.isOwningNode(jobId)
val owningNode = shardNodes.isOwningNode(jobId)
return owningNode
}
}

Expand Down

0 comments on commit 743bda8

Please sign in to comment.