Skip to content

Commit

Permalink
add stats
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 2, 2024
1 parent 743bda8 commit d324f8d
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -86,7 +88,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)

var nonPercolateSearchesTimeTaken = AtomicLong(0)
var percolateQueriesTimeTaken = AtomicLong(0)
var totalDocsQueried = AtomicLong(0)
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
Expand Down Expand Up @@ -208,7 +212,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
concreteIndexName,
nonPercolateSearchesTimeTaken
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
Expand Down Expand Up @@ -265,7 +270,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsSizeInBytes,
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
ArrayList(fieldsToBeQueried),
nonPercolateSearchesTimeTaken,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
}
Expand All @@ -281,7 +289,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
updatedIndexNames,
concreteIndicesSeenSoFar,
inputRunResults,
docsToQueries
docsToQueries,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
val took = System.currentTimeMillis() - queryingStartTimeMillis
Expand All @@ -305,11 +315,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor $monitor.id")
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
}
logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}")
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
Expand Down Expand Up @@ -363,6 +370,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
e
)
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
} finally {
logger.error(
"PERF_DEBUG_STATS: Monitor ${monitor.id} " +
"Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken"
)
logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Time spent on percolate queries in millis: $percolateQueriesTimeTaken")
logger.error("PERF_DEBUG_STATS: Monitor ${monitor.id} Num docs queried: $totalDocsQueried")
}
}

Expand Down Expand Up @@ -401,7 +415,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
trigger: DocumentLevelTrigger,
monitor: Monitor,
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
Expand All @@ -410,35 +424,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)

val findings = mutableListOf<String>()
val findingDocPairs = mutableListOf<Pair<String, String>>()
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>()

// TODO: Implement throttling for findings
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(
monitor,
monitorCtx,
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)
findings.add(findingId)
val findingToDocPairs = createFindings(
monitor,
monitorCtx,
docsToQueries,
idQueryMap,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)

if (triggerResult.triggeredDocs.contains(it.key)) {
findingDocPairs.add(Pair(findingId, it.key))
findingToDocPairs.forEach {
// Only pick those entries whose docs have triggers associated with them
if (triggerResult.triggeredDocs.contains(it.second)) {
triggerFindingDocPairs.add(Pair(it.first, it.second))
}
}

val actionCtx = triggerCtx.copy(
triggeredDocs = triggerResult.triggeredDocs,
relatedFindings = findings,
relatedFindings = findingToDocPairs.map { it.first },
error = monitorResult.error ?: triggerResult.error
)

val alerts = mutableListOf<Alert>()
findingDocPairs.forEach {
triggerFindingDocPairs.forEach {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(it.first),
listOf(it.second),
Expand Down Expand Up @@ -500,47 +512,65 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun createFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docLevelQueries: List<DocLevelQuery>,
matchingDocId: String,
docsToQueries: MutableMap<String, MutableList<String>>,
idQueryMap: Map<String, DocLevelQuery>,
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null,
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")
): List<Pair<String, String>> {

val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }

if (shouldCreateFinding) {
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
// Before the "|" is the doc id and after the "|" is the index
val docIndex = it.key.split("|")

monitorCtx.client!!.suspendUntil<Client, IndexResponse> {
monitorCtx.client!!.index(indexRequest, it)
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = triggeredQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
.string()
logger.debug("Findings: $findingStr")

if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
.opType(DocWriteRequest.OpType.CREATE)
}
}

try {
publishFinding(monitor, monitorCtx, finding)
} catch (e: Exception) {
// suppress exception
if (indexRequests.isNotEmpty()) {
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}
return finding.id
return findingDocPairs
}

private fun publishFinding(
Expand All @@ -563,13 +593,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun updateLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String
index: String,
nonPercolateSearchesTimeTaken: AtomicLong
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken)
updatedLastRunContext[shard] = maxSeqNo.toString()
}
return updatedLastRunContext
Expand Down Expand Up @@ -606,7 +637,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long {
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
Expand All @@ -625,7 +656,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
if (response.hits.hits.isEmpty())
return -1L

nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis)
return response.hits.hits[0].seqNo
}

Expand Down Expand Up @@ -654,7 +685,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsSizeInBytes: AtomicLong,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>
fieldsToBeQueried: List<String>,
nonPercolateSearchesTimeTaken: AtomicLong,
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand All @@ -678,7 +712,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
to,
null,
docIds,
fieldsToBeQueried
fieldsToBeQueried,
nonPercolateSearchesTimeTaken
)
from = to + 1
numDocsLeftToQueryFromShard -= 10000
Expand Down Expand Up @@ -706,7 +741,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries
docsToQueries,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
logger.error(
Expand Down Expand Up @@ -743,6 +780,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndices: List<String>,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong
) {
try {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
Expand All @@ -751,7 +790,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitor,
monitorMetadata,
concreteIndices,
monitorInputIndices
monitorInputIndices,
percolateQueriesTimeTaken
)

percolateQueryResponseHits.forEach { hit ->
Expand All @@ -765,6 +805,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
totalDocsQueried.getAndAdd(transformedDocs.size.toLong())
} finally { // no catch block because exception is caught and handled in runMonitor() class
transformedDocs.clear()
docsSizeInBytes.set(0)
Expand All @@ -783,6 +824,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
query: String?,
docIds: List<String>? = null,
fieldsToFetch: List<String>,
nonPercolateSearchesTimeTaken: AtomicLong,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand Down Expand Up @@ -819,6 +861,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis)
return response.hits
}

Expand All @@ -830,6 +873,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata: MonitorMetadata,
concreteIndices: List<String>,
monitorInputIndices: List<String>,
percolateQueriesTimeTaken: AtomicLong
): SearchHits {
val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList())
val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices))
Expand Down Expand Up @@ -881,8 +925,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
"Response status is ${response.status()}"
)
}
logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query hits = ${response.hits}")
logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query response = $response")
percolateQueriesTimeTaken.getAndAdd(response.took.millis)
return response.hits
}
/** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/
Expand All @@ -907,7 +952,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val sourceMap = if (hit.hasSource()) {
hit.sourceAsMap
} else {
logger.error("PERF_DEBUG:Building percolate query source docs from relevant fields only")
constructSourceMapFromFieldsInHit(hit)
}
transformDocumentFieldNames(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ class TransportIndexMonitorAction @Inject constructor(
else (it as DocLevelMonitorInput).indices
indices.addAll(inputIndices)
}
if (
indices.size == 1 && (
IndexUtils.isAlias(indices[0], clusterService.state()) ||
IndexUtils.isDataStream(indices[0], clusterService.state())
)
) {
val metadata = clusterService.state().metadata.indicesLookup[indices[0]]?.writeIndex
if (metadata != null) {
indices.removeAt(0)
indices.add(metadata.index.name)
}
}
val searchRequest = SearchRequest().indices(*indices.toTypedArray())
.source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery()))
client.search(
Expand Down

0 comments on commit d324f8d

Please sign in to comment.