Skip to content

Commit

Permalink
collate input run results from fan out responses
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 24, 2024
1 parent e68ea5b commit ced398d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.core.action.ActionListener
Expand Down Expand Up @@ -317,6 +318,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
override fun onResponse(response: Collection<DocLevelMonitorFanOutResponse>) {
logger.info("hit here1")
cont.resume(response)
val hasIndex =
monitorCtx.clusterService!!.state().routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)
logger.error("exists check")
}

override fun onFailure(e: Exception) {
Expand Down Expand Up @@ -362,12 +366,28 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, updatedLastRunContext)
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
val triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses)
val inputRunResults = buildInputRunResults(docLevelMonitorFanOutResponses)
if (!isTempMonitor) {
// If any error happened during trigger execution, upsert monitor error alert
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults)
if (errorMessage.isNotEmpty()) {
monitorCtx.alertService!!.upsertMonitorErrorAlert(
monitor = monitor,
errorMessage = errorMessage,
executionId = executionId,
workflowRunContext
)
} else {
onSuccessfulMonitorRun(monitorCtx, monitor)
}
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}
// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses))
return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults)
} catch (e: Exception) {
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
Expand All @@ -392,9 +412,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

private fun buildInputRunResults(docLevelMonitorFanOutResponses: MutableList<DocLevelMonitorFanOutResponse>): InputRunResults {
val inputRunResults = mutableMapOf<String, MutableSet<String>>()
var error: Exception? = null
for (response in docLevelMonitorFanOutResponses) {
if (error == null && response.inputResults.error != null)
error = response.inputResults.error

val partialResult = response.inputResults.results
for (result in partialResult) {
for (id in result.keys) {
inputRunResults.getOrPut(id) { mutableSetOf() }.addAll(result[id] as Collection<String>)
}
}
}
return InputRunResults(listOf(inputRunResults), error)
}

private fun buildTriggerResults(
docLevelMonitorFanOutResponses: MutableList<DocLevelMonitorFanOutResponse>,
): Map<String, DocumentLevelTriggerRunResult> {
): MutableMap<String, DocumentLevelTriggerRunResult> {
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
for (res in docLevelMonitorFanOutResponses) {
for (triggerId in res.triggerResults.keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
)
)
assertIndexNotExists(SCHEDULED_JOBS_INDEX)
var executeMonitorResponse = executeMonitor(monitor, null)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
Expand Down

0 comments on commit ced398d

Please sign in to comment.