diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index d1b90c20a..dabd3069d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -40,6 +40,7 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorMetadata import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry @@ -187,8 +188,10 @@ object MonitorMetadataService : suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata { try { - val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) + val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value) (monitor.inputs[0] as DocLevelMonitorInput).indices[0] + else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) + (monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0] else null val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap>) @@ -210,8 +213,10 @@ object MonitorMetadataService : createWithRunContext: Boolean, workflowMetadataId: String? = null, ): MonitorMetadata { - val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) + val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value) (monitor.inputs[0] as DocLevelMonitorInput).indices[0] + else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) + (monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0] else null val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)) createFullRunContext(monitorIndex) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt index 666538b06..c5ad421df 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt @@ -23,6 +23,7 @@ import org.opensearch.commons.alerting.model.InputRunResults import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.WorkflowRunContext +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.core.index.shard.ShardId import org.opensearch.core.rest.RestStatus @@ -64,7 +65,8 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() { val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() else monitorMetadata.lastRunContext.toMutableMap() as MutableMap> - val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + val remoteDocLevelMonitorInput = monitor.inputs[0] as RemoteDocLevelMonitorInput + val docLevelMonitorInput = remoteDocLevelMonitorInput.docLevelMonitorInput var shards: Set = mutableSetOf() var concreteIndices = listOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 36f702acc..4b49cbd1d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -63,6 +63,8 @@ import org.opensearch.commons.alerting.model.MonitorMetadata import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.commons.alerting.util.isMonitorOfStandardType import org.opensearch.commons.authuser.User @@ -194,10 +196,15 @@ class TransportIndexMonitorAction @Inject constructor( ) { val indices = mutableListOf() // todo: for doc level alerting: check if index is present before monitor is created. - val searchInputs = request.monitor.inputs.filter { it.name() == SearchInput.SEARCH_FIELD || it.name() == DOC_LEVEL_INPUT_FIELD } + val searchInputs = request.monitor.inputs.filter { + it.name() == SearchInput.SEARCH_FIELD || + it.name() == DOC_LEVEL_INPUT_FIELD || + it.name() == REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD + } searchInputs.forEach { val inputIndices = if (it.name() == SearchInput.SEARCH_FIELD) (it as SearchInput).indices - else (it as DocLevelMonitorInput).indices + else if (it.name() == DOC_LEVEL_INPUT_FIELD) (it as DocLevelMonitorInput).indices + else (it as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices indices.addAll(inputIndices) } val updatedIndices = indices.map { index -> diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 65b433d6b..a8b5c57de 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -6,6 +6,7 @@ package org.opensearch.alerting; import org.opensearch.action.support.WriteRequest; +import org.opensearch.alerting.monitor.inputs.SampleRemoteDocLevelMonitorInput; import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1; import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2; import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1; @@ -22,6 +23,7 @@ import org.opensearch.commons.alerting.model.Monitor; import org.opensearch.commons.alerting.model.action.Action; import org.opensearch.commons.alerting.model.action.Throttle; +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput; import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput; import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger; import org.opensearch.core.action.ActionListener; @@ -206,6 +208,15 @@ public void onFailure(Exception e) { ); }; } else { + SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput = + new SampleRemoteDocLevelMonitorInput("hello", Map.of("world", 1), 2); + BytesStreamOutput out2 = new BytesStreamOutput(); + sampleRemoteDocLevelMonitorInput.writeTo(out2); + BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes(); + + DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", List.of("index"), emptyList()); + RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput); + Monitor remoteDocLevelMonitor = new Monitor( Monitor.NO_ID, Monitor.NO_VERSION, @@ -217,7 +228,7 @@ public void onFailure(Exception e) { SampleRemoteMonitorPlugin.SAMPLE_REMOTE_DOC_LEVEL_MONITOR, null, 0, - List.of(new DocLevelMonitorInput("description", List.of("index"), emptyList())), + List.of(remoteDocLevelMonitorInput), List.of(), Map.of(), new DataSources(), diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java index 0db51fa4e..71a16c718 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java @@ -10,6 +10,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.WriteRequest; +import org.opensearch.alerting.monitor.inputs.SampleRemoteDocLevelMonitorInput; import org.opensearch.alerting.monitor.runners.SampleRemoteDocLevelMonitorRunner; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; @@ -17,9 +18,13 @@ import org.opensearch.common.settings.Settings; import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest; import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse; +import org.opensearch.commons.alerting.model.DocLevelMonitorInput; import org.opensearch.commons.alerting.model.InputRunResults; import org.opensearch.commons.alerting.model.Monitor; +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -55,30 +60,42 @@ public TransportRemoteDocLevelMonitorFanOutAction( @Override protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, ActionListener actionListener) { - Monitor monitor = request.getMonitor(); - Map lastRunContext = request.getMonitorMetadata().getLastRunContext(); - ((Map) lastRunContext.get("index")).put("0", 0); - IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX) - .source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - this.client.index(indexRequest, new ActionListener<>() { - @Override - public void onResponse(IndexResponse indexResponse) { - DocLevelMonitorFanOutResponse response = new DocLevelMonitorFanOutResponse( - clusterService.localNode().getId(), - request.getExecutionId(), - monitor.getId(), - lastRunContext, - new InputRunResults(), - new HashMap<>(), - null - ); - actionListener.onResponse(response); - } + try { + Monitor monitor = request.getMonitor(); + Map lastRunContext = request.getMonitorMetadata().getLastRunContext(); - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - }); + RemoteDocLevelMonitorInput input = (RemoteDocLevelMonitorInput) monitor.getInputs().get(0); + BytesReference customInputSerialized = input.getInput(); + StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes); + SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput = new SampleRemoteDocLevelMonitorInput(sin); + DocLevelMonitorInput docLevelMonitorInput = input.getDocLevelMonitorInput(); + String index = docLevelMonitorInput.getIndices().get(0); + + ((Map) lastRunContext.get(index)).put("0", 0); + IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX) + .source(sampleRemoteDocLevelMonitorInput.getB()).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + this.client.index(indexRequest, new ActionListener<>() { + @Override + public void onResponse(IndexResponse indexResponse) { + DocLevelMonitorFanOutResponse response = new DocLevelMonitorFanOutResponse( + clusterService.localNode().getId(), + request.getExecutionId(), + monitor.getId(), + lastRunContext, + new InputRunResults(), + new HashMap<>(), + null + ); + actionListener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + actionListener.onFailure(e); + } + }); + } catch (Exception ex) { + actionListener.onFailure(ex); + } } } \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteDocLevelMonitorInput.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteDocLevelMonitorInput.java new file mode 100644 index 000000000..ecd3234ff --- /dev/null +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteDocLevelMonitorInput.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.monitor.inputs; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Map; + +public class SampleRemoteDocLevelMonitorInput implements Writeable { + + private String a; + + private Map b; + + private int c; + + public SampleRemoteDocLevelMonitorInput(String a, Map b, int c) { + this.a = a; + this.b = b; + this.c = c; + } + + public SampleRemoteDocLevelMonitorInput(StreamInput sin) throws IOException { + this( + sin.readString(), + sin.readMap(), + sin.readInt() + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(a); + out.writeMap(b); + out.writeInt(c); + } + + public int getC() { + return c; + } + + public Map getB() { + return b; + } + + public String getA() { + return a; + } +} \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java b/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java index d5f2ce486..f38462361 100644 --- a/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java +++ b/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java @@ -167,7 +167,9 @@ public void testSampleRemoteDocLevelMonitor() throws IOException, InterruptedExc LoggingDeprecationHandler.INSTANCE, searchResponse.getEntity().getContent() ).map(); - found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1); + found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1 && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("world") && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("world").toString().equals("1")); return found.get(); } catch (IOException ex) { return false;