diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 5d5f8dbb4..903f2ecc7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -245,7 +245,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules **/ val localNode = monitorCtx.clusterService!!.localNode() - val nodeMap: Map = if (monitor?.fanoutEnabled == true) { + val nodeMap: Map = if (docLevelMonitorInput?.fanoutEnabled == true) { getNodes(monitorCtx) } else { logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index a769fc7af..f6ef9309a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2765,7 +2765,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") - val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery)) + val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery), false) val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) val monitor = createMonitor( @@ -2773,8 +2773,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { inputs = listOf(docLevelInput), triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))), enabled = true, - schedule = IntervalSchedule(1, ChronoUnit.MINUTES), - fanoutEnabled = false + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) ) ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 77cf6a538..2330974f4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -227,26 +227,6 @@ fun randomDocumentLevelMonitor( ) } -fun randomDocumentLevelMonitor( - name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - user: User? = randomUser(), - inputs: List = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())), - schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), - enabled: Boolean = randomBoolean(), - triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, - enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, - lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false, - fanoutEnabled: Boolean? = true, -): Monitor { - return Monitor( - name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, - schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), - fanoutEnabled = fanoutEnabled - ) -} - fun randomWorkflow( id: String = Workflow.NO_ID, monitorIds: List, diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index f03dbb017..91c97a636 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -96,8 +96,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient new DataSources(), false, false, - "sample-remote-monitor-plugin", - true + "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( Monitor.NO_ID, @@ -137,7 +136,7 @@ public void onFailure(Exception e) { }; } else if (runMonitorParam.equals("multiple")) { SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello", - new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())))); + new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())), true)); BytesStreamOutput out1 = new BytesStreamOutput(); input2.writeTo(out1); BytesReference input1Serialized1 = out1.bytes(); @@ -159,8 +158,7 @@ public void onFailure(Exception e) { new DataSources(), false, false, - "sample-remote-monitor-plugin", - true + "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( Monitor.NO_ID, @@ -222,7 +220,7 @@ public void onFailure(Exception e) { sampleRemoteDocLevelMonitorInput.writeTo(out2); BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes(); - DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList()); + DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList(), true); RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput); Monitor remoteDocLevelMonitor = new Monitor( @@ -245,8 +243,7 @@ public void onFailure(Exception e) { new DataSources(), false, false, - "sample-remote-monitor-plugin", - true + "sample-remote-monitor-plugin" ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest( Monitor.NO_ID,