Skip to content

Commit

Permalink
blanket error loging
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 9, 2024
1 parent 058e7e2 commit 029bbe4
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
execute(DocLevelMonitorFanOutAction.INSTANCE, docLevelMonitorFanOutRequest1, it)
}
val lastRunContextFromResponse = dlmfor.lastRunContexts as MutableMap<String, MutableMap<String, Any>>
lastRunContext[concreteIndexName] = lastRunContextFromResponse[concreteIndexName] as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = lastRunContextFromResponse[concreteIndexName] as MutableMap<String, Any>
logger.error(dlmfor)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,159 +129,164 @@ class TransportDocLevelMonitorFanOutAction
monitorCtx: MonitorRunnerExecutionContext,
listener: ActionListener<DocLevelMonitorFanOutResponse>,
) {
val monitor = request.monitor
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, Instant.now(), Instant.now())
// todo periodStart periodEnd
var nonPercolateSearchesTimeTaken = AtomicLong(0)
var percolateQueriesTimeTaken = AtomicLong(0)
var totalDocsQueried = AtomicLong(0)
var docTransformTimeTaken = AtomicLong(0)
val updatedIndexNames = request.indexExecutionContexts[0].updatedIndexNames
val concreteIndexNames = request.indexExecutionContexts[0].concreteIndexNames
val monitorMetadata = request.monitorMetadata
val queryToDocIds = mutableMapOf<DocLevelQuery, MutableSet<String>>()
val inputRunResults = mutableMapOf<String, MutableSet<String>>()
val docsToQueries = mutableMapOf<String, MutableList<String>>()
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()
val docsSizeInBytes = AtomicLong(0)
val shardIds = request.shardIds
val indexShardsMap: MutableMap<String, MutableList<Int>> = mutableMapOf()
val queryingStartTimeMillis = System.currentTimeMillis()
for (shardId in shardIds) {
if (indexShardsMap.containsKey(shardId.indexName)) {
indexShardsMap[shardId.indexName]!!.add(shardId.id)
} else {
indexShardsMap[shardId.indexName] = mutableListOf(shardId.id)
try {
val monitor = request.monitor
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, Instant.now(), Instant.now())
// todo periodStart periodEnd
var nonPercolateSearchesTimeTaken = AtomicLong(0)
var percolateQueriesTimeTaken = AtomicLong(0)
var totalDocsQueried = AtomicLong(0)
var docTransformTimeTaken = AtomicLong(0)
val updatedIndexNames = request.indexExecutionContexts[0].updatedIndexNames
val concreteIndexNames = request.indexExecutionContexts[0].concreteIndexNames
val monitorMetadata = request.monitorMetadata
val queryToDocIds = mutableMapOf<DocLevelQuery, MutableSet<String>>()
val inputRunResults = mutableMapOf<String, MutableSet<String>>()
val docsToQueries = mutableMapOf<String, MutableList<String>>()
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()
val docsSizeInBytes = AtomicLong(0)
val shardIds = request.shardIds
val indexShardsMap: MutableMap<String, MutableList<Int>> = mutableMapOf()
val queryingStartTimeMillis = System.currentTimeMillis()
for (shardId in shardIds) {
if (indexShardsMap.containsKey(shardId.indexName)) {
indexShardsMap[shardId.indexName]!!.add(shardId.id)
} else {
indexShardsMap[shardId.indexName] = mutableListOf(shardId.id)
}
}
}
val lastRunContext = mutableMapOf<String, MutableMap<String, Any>>()
InputRunResults
val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
val fieldsToBeQueried = mutableSetOf<String>()
for (it in queries) {
if (it.queryFieldNames.isEmpty()) {
fieldsToBeQueried.clear()
logger.debug(
"Monitor ${request.monitor.id} : " +
"Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " +
"Cannot optimize monitor to fetch only query-relevant fields. " +
"Querying entire doc source."
)
break
val lastRunContext = mutableMapOf<String, MutableMap<String, Any>>()
InputRunResults
val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
val fieldsToBeQueried = mutableSetOf<String>()
for (it in queries) {
if (it.queryFieldNames.isEmpty()) {
fieldsToBeQueried.clear()
log.debug(
"Monitor ${request.monitor.id} : " +
"Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " +
"Cannot optimize monitor to fetch only query-relevant fields. " +
"Querying entire doc source."
)
break
}
fieldsToBeQueried.addAll(it.queryFieldNames)
}
fieldsToBeQueried.addAll(it.queryFieldNames)
}
for (entry in indexShardsMap) {
val indexExecutionContext =
request.indexExecutionContexts.stream()
.filter { it.concreteIndexName.equals(entry.key) }.findAny()
.get()
fetchShardDataAndMaybeExecutePercolateQueries(
request.monitor,
monitorCtx,
indexExecutionContext,
request.monitorMetadata,
inputRunResults,
docsToQueries,
transformedDocs,
docsSizeInBytes,
indexExecutionContext.updatedIndexNames,
indexExecutionContext.concreteIndexNames,
ArrayList(fieldsToBeQueried),
nonPercolateSearchesTimeTaken,
percolateQueriesTimeTaken,
totalDocsQueried,
docTransformTimeTaken
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
for (entry in indexShardsMap) {
val indexExecutionContext =
request.indexExecutionContexts.stream()
.filter { it.concreteIndexName.equals(entry.key) }.findAny()
.get()
fetchShardDataAndMaybeExecutePercolateQueries(
request.monitor,
monitorCtx,
indexExecutionContext,
request.monitorMetadata,
inputRunResults,
docsToQueries,
transformedDocs,
docsSizeInBytes,
indexExecutionContext.updatedIndexNames,
indexExecutionContext.concreteIndexNames,
ArrayList(fieldsToBeQueried),
nonPercolateSearchesTimeTaken,
percolateQueriesTimeTaken,
totalDocsQueried,
docTransformTimeTaken
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
lastRunContext[indexExecutionContext.concreteIndexName] = indexExecutionContext.updatedLastRunContext
}
lastRunContext[indexExecutionContext.concreteIndexName] = indexExecutionContext.updatedLastRunContext
}

/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
the percolate query at the end */
if (transformedDocs.isNotEmpty()) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
updatedIndexNames,
concreteIndexNames,
inputRunResults,
docsToQueries,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
val took = System.currentTimeMillis() - queryingStartTimeMillis
logger.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in ${request.executionId}")
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
the percolate query at the end */
if (transformedDocs.isNotEmpty()) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
updatedIndexNames,
concreteIndexNames,
inputRunResults,
docsToQueries,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
val took = System.currentTimeMillis() - queryingStartTimeMillis
log.error("PERF_DEBUG_STAT: Entire query+percolate completed in $took millis in ${request.executionId}")
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))

/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
list of matched docId from inputRunResults>
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
*/
queries.forEach {
if (inputRunResults.containsKey(it.id)) {
queryToDocIds[it] = inputRunResults[it.id]!!
/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
list of matched docId from inputRunResults>
this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser
*/
queries.forEach {
if (inputRunResults.containsKey(it.id)) {
queryToDocIds[it] = inputRunResults[it.id]!!
}
}
}

val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }
val idQueryMap: Map<String, DocLevelQuery> = queries.associateBy { it.id }

val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (monitor.id != Monitor.NO_ID) {
logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}")
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (monitor.id != Monitor.NO_ID) {
log.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}")
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorCtx,
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
false,
executionId = request.executionId,
workflowRunContext = request.workflowRunContext
)
}
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorCtx,
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
false,

// If any error happened during trigger execution, upsert monitor error alert
val errorMessage =
constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
if (errorMessage.isNotEmpty()) {
monitorCtx.alertService!!.upsertMonitorErrorAlert(
monitor = monitor,
errorMessage = errorMessage,
executionId = request.executionId,
workflowRunContext = request.workflowRunContext
request.workflowRunContext
)
} else {
onSuccessfulMonitorRun(monitorCtx, monitor)
}
}

// If any error happened during trigger execution, upsert monitor error alert
val errorMessage =
constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
if (errorMessage.isNotEmpty()) {
monitorCtx.alertService!!.upsertMonitorErrorAlert(
monitor = monitor,
errorMessage = errorMessage,
executionId = request.executionId,
request.workflowRunContext
listener.onResponse(
DocLevelMonitorFanOutResponse(
nodeId = monitorCtx.clusterService!!.localNode().id,
executionId = request.executionId,
monitorId = monitor.id,
shardIdFailureMap = emptyMap(),
findingIds = emptyList(),
lastRunContext as MutableMap<String, Any>,
InputRunResults(listOf(inputRunResults)),
triggerResults
)
)
} else {
onSuccessfulMonitorRun(monitorCtx, monitor)
} catch (e: Exception) {
log.error("${request.monitor.id} Failed to run fan_out on node ${monitorCtx.clusterService.localNode().id} due to error",e)
listener.onFailure(e)
}
listener.onResponse(
DocLevelMonitorFanOutResponse(
nodeId = monitorCtx.clusterService!!.localNode().id,
executionId = request.executionId,
monitorId = monitor.id,
shardIdFailureMap = emptyMap(),
findingIds = emptyList(),
lastRunContext as MutableMap<String, Any>,
InputRunResults(listOf(inputRunResults)),
triggerResults
)
)
}

private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) {
Expand Down Expand Up @@ -349,7 +354,7 @@ class TransportDocLevelMonitorFanOutAction
val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
.string()
logger.debug("Findings: $findingStr")
log.debug("Findings: $findingStr")

if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
Expand All @@ -367,11 +372,11 @@ class TransportDocLevelMonitorFanOutAction
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
log.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
log.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}
return findingDocPairs
Expand Down Expand Up @@ -465,7 +470,7 @@ class TransportDocLevelMonitorFanOutAction
docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime)
}
} catch (e: Exception) {
logger.error(
log.error(
"Monitor ${monitor.id} :" +
"Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " +
"Error: ${e.message}",
Expand Down Expand Up @@ -562,7 +567,7 @@ class TransportDocLevelMonitorFanOutAction
val message =
"Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" +
" sourceIndices: $monitorInputIndices"
logger.error(message)
log.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
Expand All @@ -573,7 +578,7 @@ class TransportDocLevelMonitorFanOutAction
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
logger.debug(
log.debug(
"Monitor ${monitor.id}: " +
"Executing percolate query for docs from source indices " +
"$monitorInputIndices against query index $queryIndices"
Expand All @@ -598,8 +603,8 @@ class TransportDocLevelMonitorFanOutAction
"Response status is ${response.status()}"
)
}
logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
logger.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query response = $response")
log.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
log.error("Monitor ${monitor.id} PERF_DEBUG: Percolate query response = $response")
percolateQueriesTimeTaken.getAndAdd(response.took.millis)
return response.hits
}
Expand Down Expand Up @@ -643,7 +648,7 @@ class TransportDocLevelMonitorFanOutAction
TransformedDocDto(index, concreteIndex, hit.id, sourceRef)
)
} catch (e: Exception) {
logger.error(
log.error(
"Monitor $monitorId: Failed to transform payload $hit for percolate query",
e
)
Expand Down Expand Up @@ -1052,3 +1057,4 @@ class TransportDocLevelMonitorFanOutAction
return NotificationActionConfigs(destination, channel)
}
}
}

0 comments on commit 029bbe4

Please sign in to comment.