diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index e53187c7b..53b95a126 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting import org.opensearch.action.ActionRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutAction import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteWorkflowAction import org.opensearch.alerting.action.GetDestinationsAction @@ -51,6 +52,7 @@ import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction import org.opensearch.alerting.transport.TransportDeleteMonitorAction import org.opensearch.alerting.transport.TransportDeleteWorkflowAction +import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction import org.opensearch.alerting.transport.TransportExecuteMonitorAction import org.opensearch.alerting.transport.TransportExecuteWorkflowAction import org.opensearch.alerting.transport.TransportGetAlertsAction @@ -211,7 +213,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java), - ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java) + ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java), + ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java) ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index bb74b7972..ec421123e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -16,7 +16,9 @@ import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutAction import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults @@ -283,8 +285,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ), workflowRunContext ) + val dlmfor: DocLevelMonitorFanOutResponse = monitorCtx.client!!.suspendUntil { + execute(DocLevelMonitorFanOutAction.INSTANCE, docLevelMonitorFanOutRequest1, it) + } + val lastRunContextFromResponse = dlmfor.lastRunContexts as MutableMap> + lastRunContext[concreteIndexName] = lastRunContextFromResponse[concreteIndexName] as MutableMap + logger.error(dlmfor) } } + MonitorMetadataService.upsertMetadata( monitorMetadata.copy(lastRunContext = updatedLastRunContext), true diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 68e9e59f3..3b17ceebe 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -21,7 +21,6 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService import org.opensearch.threadpool.ThreadPool -import org.opensearch.transport.TransportService data class MonitorRunnerExecutionContext( @@ -39,7 +38,6 @@ data class MonitorRunnerExecutionContext( var docLevelMonitorQueries: DocLevelMonitorQueries? = null, var workflowService: WorkflowService? = null, var jvmStats: JvmStats? = null, - val transportService: TransportService, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt index 60da3b385..2a6afd63b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -40,8 +40,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { this.indexExecutionContexts = indexExecutionContexts this.shardIds = shardIds this.workflowRunContext = workflowRunContext - require(shardIds.isEmpty()) { } - require(indexExecutionContexts.isEmpty()) { } + require(false == shardIds.isEmpty()) { } + require(false == indexExecutionContexts.isEmpty()) { } } @Throws(IOException::class) @@ -69,16 +69,17 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { workflowRunContext?.writeTo(out) } - override fun validate(): ActionRequestValidationException { + override fun validate(): ActionRequestValidationException? { var actionValidationException: ActionRequestValidationException? = null if (shardIds.isEmpty()) { actionValidationException = ActionRequestValidationException() actionValidationException.addValidationError("shard_ids is null or empty") } - if (indexExecutionContexts.isEmpty()) + if (indexExecutionContexts.isEmpty()) { if (actionValidationException == null) actionValidationException = ActionRequestValidationException() - actionValidationException!!.addValidationError("index_execution_contexts is null or empty") + actionValidationException.addValidationError("index_execution_contexts is null or empty") + } return actionValidationException } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt index 87e2361ea..53f25c177 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt @@ -19,7 +19,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject { val findingIds: List // for shards not delegated to nodes sequence number would be -3 (new number shard was not queried), - val lastRunContexts: Map // partial + val lastRunContexts: MutableMap // partial val inputResults: InputRunResults // partial val triggerResults: Map // partial @@ -30,7 +30,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject { monitorId = sin.readString(), shardIdFailureMap = sin.readMap() as Map, findingIds = sin.readStringList(), - lastRunContexts = sin.readMap()!! as Map, + lastRunContexts = sin.readMap()!! as MutableMap, inputResults = InputRunResults.readFrom(sin), triggerResults = MonitorRunResult.suppressWarning(sin.readMap()) as Map // triggerResults ) @@ -41,7 +41,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject { monitorId: String, shardIdFailureMap: Map, findingIds: List, - lastRunContexts: Map, + lastRunContexts: MutableMap, inputResults: InputRunResults = InputRunResults(), // partial, triggerResults: Map = mapOf(), ) : super() { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 1f5e70276..8b9671d1f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -18,6 +18,7 @@ import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.MonitorRunnerExecutionContext import org.opensearch.alerting.MonitorRunnerService +import org.opensearch.alerting.action.DocLevelMonitorFanOutAction import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse import org.opensearch.alerting.action.GetDestinationsAction @@ -52,12 +53,10 @@ import org.opensearch.alerting.util.use import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.client.node.NodeClient -import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType -import org.opensearch.commons.alerting.action.AlertingActions import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DocLevelMonitorInput @@ -74,7 +73,6 @@ import org.opensearch.core.action.ActionListener import org.opensearch.core.common.Strings import org.opensearch.core.common.bytes.BytesReference import org.opensearch.core.rest.RestStatus -import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.index.query.BoolQueryBuilder @@ -103,12 +101,10 @@ class TransportDocLevelMonitorFanOutAction transportService: TransportService, val client: Client, val actionFilters: ActionFilters, - val clusterService: ClusterService, + val runner: MonitorRunnerService, val settings: Settings, - val xContentRegistry: NamedXContentRegistry, - val monitorCtx: MonitorRunnerExecutionContext, ) : HandledTransportAction( - AlertingActions.INDEX_MONITOR_ACTION_NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest + DocLevelMonitorFanOutAction.NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest ), SecureTransportAction { @@ -120,7 +116,11 @@ class TransportDocLevelMonitorFanOutAction listener: ActionListener, ) { scope.launch { - executeMonitor(request, monitorCtx, listener) + executeMonitor( + request, + monitorCtx = runner.monitorCtx, + listener = listener + ) } } @@ -154,6 +154,7 @@ class TransportDocLevelMonitorFanOutAction indexShardsMap[shardId.indexName] = mutableListOf(shardId.id) } } + val lastRunContext = mutableMapOf>() InputRunResults val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput val queries: List = docLevelMonitorInput.queries @@ -195,7 +196,9 @@ class TransportDocLevelMonitorFanOutAction ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo } + lastRunContext[indexExecutionContext.concreteIndexName] = indexExecutionContext.updatedLastRunContext } + /* if all indices are covered still in-memory docs size limit is not breached we would need to submit the percolate query at the end */ if (transformedDocs.isNotEmpty()) { @@ -269,12 +272,12 @@ class TransportDocLevelMonitorFanOutAction } listener.onResponse( DocLevelMonitorFanOutResponse( - nodeId = clusterService.localNode().id, + nodeId = monitorCtx.clusterService!!.localNode().id, executionId = request.executionId, monitorId = monitor.id, shardIdFailureMap = emptyMap(), findingIds = emptyList(), - request.indexExecutionContexts[0].updatedLastRunContext, + lastRunContext as MutableMap, InputRunResults(listOf(inputRunResults)), triggerResults )