Skip to content

Commit

Permalink
shards assignment to local Node when fanout flag is disabled
Browse files Browse the repository at this point in the history
Signed-off-by: Riya Saxena <[email protected]>
  • Loading branch information
riysaxen-amzn committed Dec 17, 2024
1 parent c6f985a commit 779afc3
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
* This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules
**/
val localNode = monitorCtx.clusterService!!.localNode()
val nodeMap: Map<String, DiscoveryNode> = if (monitor?.fanoutEnabled == true) {
val nodeMap: Map<String, DiscoveryNode> = if (docLevelMonitorInput?.fanoutEnabled == true) {
getNodes(monitorCtx)
} else {
logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2765,16 +2765,15 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
)

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery))
val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery), false)

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES),
fanoutEnabled = false
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
)

Expand Down
20 changes: 0 additions & 20 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -227,26 +227,6 @@ fun randomDocumentLevelMonitor(
)
}

fun randomDocumentLevelMonitor(
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
inputs: List<Input> = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())),
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = randomBoolean(),
triggers: List<Trigger> = (1..randomInt(10)).map { randomQueryLevelTrigger() },
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false,
fanoutEnabled: Boolean? = true,
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(),
fanoutEnabled = fanoutEnabled
)
}

fun randomWorkflow(
id: String = Workflow.NO_ID,
monitorIds: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
new DataSources(),
false,
false,
"sample-remote-monitor-plugin",
true
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest(
Monitor.NO_ID,
Expand Down Expand Up @@ -137,7 +136,7 @@ public void onFailure(Exception e) {
};
} else if (runMonitorParam.equals("multiple")) {
SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello",
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of()))));
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())), true));
BytesStreamOutput out1 = new BytesStreamOutput();
input2.writeTo(out1);
BytesReference input1Serialized1 = out1.bytes();
Expand All @@ -159,8 +158,7 @@ public void onFailure(Exception e) {
new DataSources(),
false,
false,
"sample-remote-monitor-plugin",
true
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest(
Monitor.NO_ID,
Expand Down Expand Up @@ -222,7 +220,7 @@ public void onFailure(Exception e) {
sampleRemoteDocLevelMonitorInput.writeTo(out2);
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();

DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList());
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList(), true);
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);

Monitor remoteDocLevelMonitor = new Monitor(
Expand All @@ -245,8 +243,7 @@ public void onFailure(Exception e) {
new DataSources(),
false,
false,
"sample-remote-monitor-plugin",
true
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(
Monitor.NO_ID,
Expand Down

0 comments on commit 779afc3

Please sign in to comment.