From 89ebb3ff6bc0b576ab424a5ffd6d0317e8c9fe4e Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 8 Feb 2024 22:11:40 -0800 Subject: [PATCH] add monitor metadata and monitor objects to fan out request to avoid disk seeks Signed-off-by: Surya Sashank Nistala --- .../action/DocLevelMonitorFanOutRequest.kt | 19 ++++--- .../TransportDocLevelMonitorFanOutAction.kt | 51 +++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt index 3105b6446..33ef42564 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -3,6 +3,8 @@ package org.opensearch.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException import org.opensearch.alerting.model.IndexExecutionContext +import org.opensearch.alerting.model.MonitorMetadata +import org.opensearch.commons.alerting.model.Monitor import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.index.shard.ShardId @@ -14,20 +16,23 @@ import java.io.IOException class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { val nodeId: String - val monitorId: String + val monitor: Monitor + val monitorMetadata: MonitorMetadata val executionId: String val indexExecutionContexts: List val shardIds: List constructor( nodeId: String, - monitorId: String, + monitor: Monitor, + monitorMetadata: MonitorMetadata, executionId: String, indexExecutionContexts: List, shardIds: List, ) : super() { this.nodeId = nodeId - this.monitorId = monitorId + this.monitor = monitor + this.monitorMetadata = monitorMetadata this.executionId = executionId this.indexExecutionContexts = indexExecutionContexts this.shardIds = shardIds @@ -38,7 +43,8 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { @Throws(IOException::class) constructor(sin: StreamInput) : this( nodeId = sin.readString(), - monitorId = sin.readString(), + monitor = Monitor.readFrom(sin)!!, + monitorMetadata = MonitorMetadata.readFrom(sin), executionId = sin.readString(), indexExecutionContexts = sin.readList { IndexExecutionContext(sin) }, shardIds = sin.readList(::ShardId) @@ -47,13 +53,14 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { @Throws(IOException::class) override fun writeTo(out: StreamOutput) { out.writeString(nodeId) - out.writeString(monitorId) + monitor.writeTo(out) + monitorMetadata.writeTo(out) out.writeString(executionId) out.writeCollection(indexExecutionContexts) out.writeCollection(shardIds) } - override fun validate(): ActionRequestValidationException? { + override fun validate(): ActionRequestValidationException { var actionValidationException: ActionRequestValidationException? = null if (shardIds.isEmpty()) { actionValidationException = ActionRequestValidationException() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt new file mode 100644 index 000000000..fb46a7aeb --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -0,0 +1,51 @@ +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import org.apache.logging.log4j.LogManager +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.MonitorRunnerExecutionContext +import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest +import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.core.action.ActionListener +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +private val log = LogManager.getLogger(TransportDocLevelMonitorFanOutAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportDocLevelMonitorFanOutAction @Inject constructor( + transportService: TransportService, + val client: Client, + val actionFilters: ActionFilters, + val clusterService: ClusterService, + val settings: Settings, + val xContentRegistry: NamedXContentRegistry, + val monitorCtx: MonitorRunnerExecutionContext, + +) : HandledTransportAction( + AlertingActions.INDEX_MONITOR_ACTION_NAME, transportService, actionFilters, ::DocLevelMonitorFanOutRequest +), + SecureTransportAction { + + @Volatile + override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + override fun doExecute( + task: Task, + request: DocLevelMonitorFanOutRequest, + listener: ActionListener, + ) { + executeMonitor(request, monitorCtx) + } + + private fun executeMonitor(request: DocLevelMonitorFanOutRequest, monitorCtx: MonitorRunnerExecutionContext) { + } +}