Skip to content

Commit

Permalink
fix fan out action bugs wip
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 9, 2024
1 parent 920afc8 commit 058e7e2
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.opensearch.action.ActionRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
Expand Down Expand Up @@ -51,6 +52,7 @@ import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
Expand Down Expand Up @@ -211,7 +213,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
Expand Down Expand Up @@ -283,8 +285,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
),
workflowRunContext
)
val dlmfor: DocLevelMonitorFanOutResponse = monitorCtx.client!!.suspendUntil {
execute(DocLevelMonitorFanOutAction.INSTANCE, docLevelMonitorFanOutRequest1, it)
}
val lastRunContextFromResponse = dlmfor.lastRunContexts as MutableMap<String, MutableMap<String, Any>>
lastRunContext[concreteIndexName] = lastRunContextFromResponse[concreteIndexName] as MutableMap<String, Any>
logger.error(dlmfor)
}
}

MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.ScriptService
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService

data class MonitorRunnerExecutionContext(

Expand All @@ -39,7 +38,6 @@ data class MonitorRunnerExecutionContext(
var docLevelMonitorQueries: DocLevelMonitorQueries? = null,
var workflowService: WorkflowService? = null,
var jvmStats: JvmStats? = null,
val transportService: TransportService,

@Volatile var retryPolicy: BackoffPolicy? = null,
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
this.indexExecutionContexts = indexExecutionContexts
this.shardIds = shardIds
this.workflowRunContext = workflowRunContext
require(shardIds.isEmpty()) { }
require(indexExecutionContexts.isEmpty()) { }
require(false == shardIds.isEmpty()) { }
require(false == indexExecutionContexts.isEmpty()) { }
}

@Throws(IOException::class)
Expand Down Expand Up @@ -69,16 +69,17 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
workflowRunContext?.writeTo(out)
}

override fun validate(): ActionRequestValidationException {
override fun validate(): ActionRequestValidationException? {
var actionValidationException: ActionRequestValidationException? = null
if (shardIds.isEmpty()) {
actionValidationException = ActionRequestValidationException()
actionValidationException.addValidationError("shard_ids is null or empty")
}
if (indexExecutionContexts.isEmpty())
if (indexExecutionContexts.isEmpty()) {
if (actionValidationException == null)
actionValidationException = ActionRequestValidationException()
actionValidationException!!.addValidationError("index_execution_contexts is null or empty")
actionValidationException.addValidationError("index_execution_contexts is null or empty")
}
return actionValidationException
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
val findingIds: List<String>

// for shards not delegated to nodes sequence number would be -3 (new number shard was not queried),
val lastRunContexts: Map<String, Any> // partial
val lastRunContexts: MutableMap<String, Any> // partial
val inputResults: InputRunResults // partial
val triggerResults: Map<String, DocumentLevelTriggerRunResult> // partial

Expand All @@ -30,7 +30,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
monitorId = sin.readString(),
shardIdFailureMap = sin.readMap() as Map<String, Exception>,
findingIds = sin.readStringList(),
lastRunContexts = sin.readMap()!! as Map<String, Any>,
lastRunContexts = sin.readMap()!! as MutableMap<String, Any>,
inputResults = InputRunResults.readFrom(sin),
triggerResults = MonitorRunResult.suppressWarning(sin.readMap()) as Map<String, DocumentLevelTriggerRunResult> // triggerResults
)
Expand All @@ -41,7 +41,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
monitorId: String,
shardIdFailureMap: Map<String, Exception>,
findingIds: List<String>,
lastRunContexts: Map<String, Any>,
lastRunContexts: MutableMap<String, Any>,
inputResults: InputRunResults = InputRunResults(), // partial,
triggerResults: Map<String, DocumentLevelTriggerRunResult> = mapOf(),
) : super() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.alerting.action.GetDestinationsAction
Expand Down Expand Up @@ -52,12 +53,10 @@ import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand All @@ -74,7 +73,6 @@ import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.Strings
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.query.BoolQueryBuilder
Expand Down Expand Up @@ -103,12 +101,10 @@ class TransportDocLevelMonitorFanOutAction
transportService: TransportService,
val client: Client,
val actionFilters: ActionFilters,
val clusterService: ClusterService,
val runner: MonitorRunnerService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry,
val monitorCtx: MonitorRunnerExecutionContext,
) : HandledTransportAction<DocLevelMonitorFanOutRequest, DocLevelMonitorFanOutResponse>(
AlertingActions.INDEX_MONITOR_ACTION_NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest
DocLevelMonitorFanOutAction.NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest
),
SecureTransportAction {

Expand All @@ -120,7 +116,11 @@ class TransportDocLevelMonitorFanOutAction
listener: ActionListener<DocLevelMonitorFanOutResponse>,
) {
scope.launch {
executeMonitor(request, monitorCtx, listener)
executeMonitor(
request,
monitorCtx = runner.monitorCtx,
listener = listener
)
}
}

Expand Down Expand Up @@ -154,6 +154,7 @@ class TransportDocLevelMonitorFanOutAction
indexShardsMap[shardId.indexName] = mutableListOf(shardId.id)
}
}
val lastRunContext = mutableMapOf<String, MutableMap<String, Any>>()
InputRunResults
val docLevelMonitorInput = request.monitor.inputs[0] as DocLevelMonitorInput
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries
Expand Down Expand Up @@ -195,7 +196,9 @@ class TransportDocLevelMonitorFanOutAction
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
lastRunContext[indexExecutionContext.concreteIndexName] = indexExecutionContext.updatedLastRunContext
}

/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
the percolate query at the end */
if (transformedDocs.isNotEmpty()) {
Expand Down Expand Up @@ -269,12 +272,12 @@ class TransportDocLevelMonitorFanOutAction
}
listener.onResponse(
DocLevelMonitorFanOutResponse(
nodeId = clusterService.localNode().id,
nodeId = monitorCtx.clusterService!!.localNode().id,
executionId = request.executionId,
monitorId = monitor.id,
shardIdFailureMap = emptyMap(),
findingIds = emptyList(),
request.indexExecutionContexts[0].updatedLastRunContext,
lastRunContext as MutableMap<String, Any>,
InputRunResults(listOf(inputRunResults)),
triggerResults
)
Expand Down

0 comments on commit 058e7e2

Please sign in to comment.