diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 1d0473543..1c52baf89 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -33,7 +33,6 @@ import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata -import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.XContentFactory @@ -162,6 +161,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val docsSizeInBytes = AtomicLong(0) val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() + val queryingStartTimeMillis = System.currentTimeMillis() docLevelMonitorInput.indices.forEach { indexName -> var concreteIndices = IndexUtils.resolveAllIndices( @@ -284,6 +284,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries ) } + val took = System.currentTimeMillis() - queryingStartTimeMillis + logger.error("PERF_DEBUG: Entire query+percolate completed in $took millis in $executionId") monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -303,6 +305,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { + logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor $monitor.id") docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) @@ -338,7 +341,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } else { onSuccessfulMonitorRun(monitorCtx, monitor) } - + logger.error("Calling upsertMetadata function from ${monitorCtx.clusterService!!.localNode().id} in " + + "execution $executionId") MonitorMetadataService.upsertMetadata( monitorMetadata.copy(lastRunContext = updatedLastRunContext), true @@ -533,7 +537,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { publishFinding(monitor, monitorCtx, finding) } catch (e: Exception) { // suppress exception - logger.error("Optional finding callback failed", e) } return finding.id } @@ -657,27 +660,58 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { + continue + } + var from = prevSeqNo ?: 0 + var numDocsLeftToQueryFromShard = maxSeqNo - from - val hits: SearchHits = searchShard( - monitorCtx, - concreteIndexName, - shard, - prevSeqNo, - maxSeqNo, - null, - docIds, - fieldsToBeQueried - ) - transformedDocs.addAll( - transformSearchHitsAndReconstructDocs( - hits, - indexName, + while (numDocsLeftToQueryFromShard > 0) { + val to = from + if (numDocsLeftToQueryFromShard < 10000) numDocsLeftToQueryFromShard else 10000 + val hits: SearchHits = searchShard( + monitorCtx, concreteIndexName, - monitor.id, - conflictingFields, - docsSizeInBytes + shard, + from, + to, + null, + docIds, + fieldsToBeQueried ) - ) + from = to + 1 + numDocsLeftToQueryFromShard -= 10000 + val startTime = Instant.now() + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexName, + concreteIndexName, + monitor.id, + conflictingFields, + docsSizeInBytes + ) + ) + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + ) { + performPercolateQueryAndResetCounters( + monitorCtx, + transformedDocs, + docsSizeInBytes, + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries + ) + } + logger.error( + "PERF_DEBUG: Transforming docs of shard [$indexName][$shard] " + + "took ${Instant.now().epochSecond - startTime.epochSecond}" + ) + } } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + @@ -685,22 +719,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) } - if ( - transformedDocs.isNotEmpty() && - shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) - ) { - performPercolateQueryAndResetCounters( - monitorCtx, - transformedDocs, - docsSizeInBytes, - monitor, - monitorMetadata, - monitorInputIndices, - concreteIndices, - inputRunResults, - docsToQueries - ) - } } } @@ -751,7 +769,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } - /** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo. + /** Executes search query on given shard of given index to fetch docs with sequence number greater than prevSeqNo. * This method hence fetches only docs from shard which haven't been queried before */ private suspend fun searchShard( @@ -787,7 +805,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .query(boolQueryBuilder) .size(10000) // fixme: use scroll to ensure all docs are covered, when number of queryable docs are greater than 10k ) - .preference(Preference.PRIMARY_FIRST.type()) if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) { logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}") @@ -833,7 +850,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } val searchRequest = - SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type()) + SearchRequest().indices(*queryIndices.toTypedArray()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) @@ -863,6 +880,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query hits = ${response.hits}") return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 5c8886686..9bd923e22 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -108,6 +108,7 @@ object MonitorMetadataService : primaryTerm = response.primaryTerm ) } catch (e: Exception) { + log.error("Failed to upsert metadata", e) throw AlertingException.wrap(e) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index a884199f8..b8719b4b0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -257,11 +257,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon when (job) { is Workflow -> { launch { + logger.debug( + "PERF_DEBUG: executing workflow ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) runJob(job, periodStart, periodEnd, false) } } is Monitor -> { launch { + logger.debug( + "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) runJob(job, periodStart, periodEnd, false) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 42237853f..21e4d7077 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -395,7 +395,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } bulkResponse.forEach { bulkItemResponse -> if (bulkItemResponse.isFailed) { - log.debug(bulkItemResponse.failureMessage) + log.error("Failed to index doc level query for monitor $monitorId due to" + bulkItemResponse.failureMessage) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index cfed18c89..f0253858b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -60,7 +60,10 @@ object CompositeWorkflowRunner : WorkflowRunner() { val isTempWorkflow = dryRun || workflow.id == Workflow.NO_ID val executionId = generateExecutionId(isTempWorkflow, workflow) - + logger.debug( + "Workflow ${workflow.id} execution began at $workflowExecutionStartTime" + + " on node ${monitorCtx.clusterService!!.localNode().id}" + ) val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata( workflow = workflow, skipIndex = isTempWorkflow, @@ -227,6 +230,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { ) } workflowRunResult.executionEndTime = Instant.now() + logger.debug("Workflow ${workflow.id} execution completed at $workflowRunResult.executionEndTime") return workflowRunResult } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt index b67f278b2..f5a267388 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt @@ -462,11 +462,13 @@ class JobSweeper( private fun isOwningNode(shardId: ShardId, jobId: JobId): Boolean { val localNodeId = clusterService.localNode().id - val shardNodeIds = clusterService.state().routingTable.shardRoutingTable(shardId) + val shardRoutingTable = clusterService.state().routingTable.shardRoutingTable(shardId) + val shardNodeIds = shardRoutingTable .filter { it.active() } .map { it.currentNodeId() } val shardNodes = ShardNodes(localNodeId, shardNodeIds) - return shardNodes.isOwningNode(jobId) + val owningNode = shardNodes.isOwningNode(jobId) + return owningNode } }