From 27322c689ac1e8bf13be4ae1d71ec7a1675b4e2e Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 8 Feb 2024 18:19:56 -0800 Subject: [PATCH] fan out monitor request, response, action --- .../alerting/BucketLevelMonitorRunner.kt | 2 +- .../alerting/DocumentLevelMonitorRunner.kt | 6 ++ .../action/DocLevelMonitorFanOutAction.kt | 10 +++ .../action/DocLevelMonitorFanOutRequest.kt | 79 +++++++++++++++++++ .../action/DocLevelMonitorFanOutResponse.kt | 60 ++++++++++++++ .../alerting/model/IndexExecutionContext.kt | 55 ++++++++++++- 6 files changed, 207 insertions(+), 5 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutAction.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 9268f5616..bf7f5953c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -52,7 +52,7 @@ import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant import java.util.UUID -//TODO raise PR for bucket level monitor optimization also. dont miss +// TODO raise PR for bucket level monitor optimization also. dont miss object BucketLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index daeb22945..a1b62abce 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -262,6 +262,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), ) + // map + // build DocLevelMonitorFanOutRequest + // groupedlistener + // monitorCtx.client.send request parallel calls fetchShardDataAndMaybeExecutePercolateQueries( monitor, @@ -359,6 +363,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { "Calling upsertMetadata function from ${monitorCtx.clusterService!!.localNode().id} in " + "execution $executionId" ) + // construct metadata from all nodes' fanout + // response MonitorMetadataService.upsertMetadata( monitorMetadata.copy(lastRunContext = updatedLastRunContext), true diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutAction.kt new file mode 100644 index 000000000..f1a1ff2c8 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutAction.kt @@ -0,0 +1,10 @@ +package org.opensearch.alerting.action + +import org.opensearch.action.ActionType + +class DocLevelMonitorFanOutAction private constructor() : ActionType(NAME, ::DocLevelMonitorFanOutResponse) { + companion object { + val INSTANCE = DocLevelMonitorFanOutAction() + const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt new file mode 100644 index 000000000..3105b6446 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -0,0 +1,79 @@ +package org.opensearch.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.alerting.model.IndexExecutionContext +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.index.shard.ShardId +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { + + val nodeId: String + val monitorId: String + val executionId: String + val indexExecutionContexts: List + val shardIds: List + + constructor( + nodeId: String, + monitorId: String, + executionId: String, + indexExecutionContexts: List, + shardIds: List, + ) : super() { + this.nodeId = nodeId + this.monitorId = monitorId + this.executionId = executionId + this.indexExecutionContexts = indexExecutionContexts + this.shardIds = shardIds + require(shardIds.isEmpty()) { } + require(indexExecutionContexts.isEmpty()) { } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + nodeId = sin.readString(), + monitorId = sin.readString(), + executionId = sin.readString(), + indexExecutionContexts = sin.readList { IndexExecutionContext(sin) }, + shardIds = sin.readList(::ShardId) + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(nodeId) + out.writeString(monitorId) + out.writeString(executionId) + out.writeCollection(indexExecutionContexts) + out.writeCollection(shardIds) + } + + override fun validate(): ActionRequestValidationException? { + var actionValidationException: ActionRequestValidationException? = null + if (shardIds.isEmpty()) { + actionValidationException = ActionRequestValidationException() + actionValidationException.addValidationError("shard_ids is null or empty") + } + if (indexExecutionContexts.isEmpty()) + if (actionValidationException == null) + actionValidationException = ActionRequestValidationException() + actionValidationException!!.addValidationError("index_execution_contexts is null or empty") + return actionValidationException + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("node_id", nodeId) + .field("monitor_id", nodeId) + .field("execution_id", nodeId) + .field("index_execution_contexts", indexExecutionContexts) + .field("shard_ids", shardIds) + return builder.endObject() + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt new file mode 100644 index 000000000..b3a983ab4 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutResponse.kt @@ -0,0 +1,60 @@ +package org.opensearch.alerting.action + +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.index.shard.ShardId +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject { + + val nodeId: String + val executionId: String + val monitorId: String + val shardIdFailureMap: Map + val findingIds: List + // for shards not delegated to nodes sequence number would be -3 (new number shard was not queried), + val lastRunContexts: Map + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readString(), + sin.readString(), + sin.readMap() as Map, + sin.readStringList(), + sin.readMap()!! as Map, + ) + + constructor( + nodeId: String, + executionId: String, + monitorId: String, + shardIdFailureMap: Map, + findingIds: List, + lastRunContexts: Map, + ) : super() { + this.nodeId = nodeId + this.executionId = executionId + this.monitorId = monitorId + this.shardIdFailureMap = shardIdFailureMap + this.findingIds = findingIds + this.lastRunContexts = lastRunContexts + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeMap(lastRunContexts) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("last_run_contexts", lastRunContexts) + .endObject() + return builder + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt index 97156eb96..d09ae3283 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt @@ -6,13 +6,60 @@ package org.opensearch.alerting.model import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException data class IndexExecutionContext( val queries: List, - val lastRunContext: MutableMap, - val updatedLastRunContext: MutableMap, + val lastRunContext: MutableMap, // previous execution + val updatedLastRunContext: MutableMap, // without sequence numbers val indexName: String, val concreteIndexName: String, val conflictingFields: List, - val docIds: List? = null -) + val docIds: List? = emptyList(), +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + queries = sin.readList { DocLevelQuery(sin) }, + lastRunContext = sin.readMap(), + updatedLastRunContext = sin.readMap(), + indexName = sin.readString(), + concreteIndexName = sin.readString(), + conflictingFields = sin.readStringList(), + docIds = sin.readStringList() + ) + + override fun writeTo(out: StreamOutput?) { + out!!.writeCollection(queries) + out.writeMap(lastRunContext) + out.writeMap(updatedLastRunContext) + out.writeString(indexName) + out.writeString(concreteIndexName) + out.writeStringCollection(conflictingFields) + out.writeOptionalStringCollection(docIds) + } + + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + builder!!.startObject() + .field("queries", queries) + .field("last_run_context", lastRunContext) + .field("updated_last_run_context", updatedLastRunContext) + .field("index_name", indexName) + .field("concrete_index_name", concreteIndexName) + .field("conflicting_fields", conflictingFields) + .field("doc_ids", docIds) + .endObject() + return builder + } + + companion object { + fun readFrom(sin: StreamInput): List { + TODO("Not yet implemented") + } + } +}