Skip to content

Commit

Permalink
notification for alerting in correlation
Browse files Browse the repository at this point in the history
  • Loading branch information
riysaxen-amzn committed Mar 5, 2024
1 parent 8ef0a3f commit da73e7e
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package org.opensearch.securityanalytics.correlation;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.commons.alerting.model.DocLevelQuery;
import org.opensearch.commons.notifications.model.ChannelMessage;
import org.opensearch.commons.notifications.model.EventSource;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
Expand All @@ -36,10 +38,13 @@
import org.opensearch.securityanalytics.logtype.LogTypeService;
import org.opensearch.securityanalytics.model.CorrelationQuery;
import org.opensearch.securityanalytics.model.CorrelationRule;
import org.opensearch.securityanalytics.model.CorrelationRuleTrigger;
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
import org.opensearch.securityanalytics.util.AutoCorrelationsRepo;

import org.opensearch.securityanalytics.util.NotificationApiUtils;
import org.opensearch.securityanalytics.util.NotificationApiHelper;
import org.opensearch.commons.alerting.model.action.Action;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -331,7 +336,6 @@ public void onResponse(MultiSearchResponse items) {
for (FilteredCorrelationRule rule: filteredCorrelationRules) {
List<CorrelationQuery> queries = rule.correlationRule.getCorrelationQueries();
Long timeWindow = rule.correlationRule.getCorrTimeWindow();

for (CorrelationQuery query: queries) {
List<CorrelationQuery> correlationQueries;
if (categoryToQueriesMap.containsKey(query.getCategory())) {
Expand Down Expand Up @@ -367,8 +371,9 @@ public void onResponse(MultiSearchResponse items) {
categoryToQueriesMap.put(query.getCategory(), correlationQueries);
}
}

searchFindingsByTimestamp(detectorType, categoryToQueriesMap, categoryToTimeWindowMap,
filteredCorrelationRules.stream().map(it -> it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList()),
filteredCorrelationRules,
autoCorrelations
);
}
Expand All @@ -391,7 +396,7 @@ public void onFailure(Exception e) {
* this method searches for parent findings given the log category & correlation time window & collects all related docs
* for them.
*/
private void searchFindingsByTimestamp(String detectorType, Map<String, List<CorrelationQuery>> categoryToQueriesMap, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void searchFindingsByTimestamp(String detectorType, Map<String, List<CorrelationQuery>> categoryToQueriesMap, Map<String, Long> categoryToTimeWindowMap, List<FilteredCorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli();
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<Pair<String, List<CorrelationQuery>>> categoryToQueriesPairs = new ArrayList<>();
Expand Down Expand Up @@ -457,15 +462,16 @@ public void onFailure(Exception e) {
if (!autoCorrelations.isEmpty()) {
correlateFindingAction.getTimestampFeature(detectorType, autoCorrelations, null, List.of());
} else {
correlateFindingAction.getTimestampFeature(detectorType, null, request.getFinding(), correlationRules);
List<String> correlationRuleIds = correlationRules.stream().map(it -> it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList());
correlateFindingAction.getTimestampFeature(detectorType, null, request.getFinding(), correlationRuleIds);
}
}
}

/**
* Given the related docs from parent findings, this method filters only those related docs which match parent join criteria.
*/
private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearchCriteria> relatedDocsMap, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearchCriteria> relatedDocsMap, Map<String, Long> categoryToTimeWindowMap, List<FilteredCorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<String> categories = new ArrayList<>();

Expand Down Expand Up @@ -526,7 +532,8 @@ public void onFailure(Exception e) {
if (!autoCorrelations.isEmpty()) {
correlateFindingAction.getTimestampFeature(detectorType, autoCorrelations, null, List.of());
} else {
correlateFindingAction.getTimestampFeature(detectorType, null, request.getFinding(), correlationRules);
List<String> correlationRuleIds = correlationRules.stream().map(it -> it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList());
correlateFindingAction.getTimestampFeature(detectorType, null, request.getFinding(), correlationRuleIds);
}
}
}
Expand All @@ -535,7 +542,7 @@ public void onFailure(Exception e) {
* Given the filtered related docs of the parent findings, this method gets the actual filtered parent findings for
* the finding to be correlated.
*/
private void getCorrelatedFindings(String detectorType, Map<String, List<String>> filteredRelatedDocIds, Map<String, Long> categoryToTimeWindowMap, List<String> correlationRules, Map<String, List<String>> autoCorrelations) {
private void getCorrelatedFindings(String detectorType, Map<String, List<String>> filteredRelatedDocIds, Map<String, Long> categoryToTimeWindowMap, List<FilteredCorrelationRule> correlationRules, Map<String, List<String>> autoCorrelations) {
long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli();
MultiSearchRequest mSearchRequest = new MultiSearchRequest();
List<String> categories = new ArrayList<>();
Expand Down Expand Up @@ -591,6 +598,31 @@ public void onResponse(MultiSearchResponse items) {
}
++idx;
}
for (FilteredCorrelationRule corrRule: correlationRules) {
List<CorrelationRuleTrigger> triggers = corrRule.correlationRule.getCorrelationTriggers();
List<String> list=new ArrayList<>();

log.info("triggers are: {}", triggers.toString());
for(CorrelationRuleTrigger trigger: triggers) {
String severity = trigger.getSeverity();
List<Action> actions = trigger.getActions();
log.info("trigger Actions are: {}", actions.toString());

for(Action action: actions) {
if (action.getDestinationId() != null && !action.getDestinationId().isEmpty()) {
log.info("Destination id is: {}", action.getDestinationId());
list.add(action.getDestinationId());
try {
log.info("Reaching here and calling: {}", action.getDestinationId());
NotificationApiUtils.sendNotification((NodeClient) client, action.getDestinationId(), "", list);
} catch (IOException e) {
log.info("Exception is: {}", e);
}
}
}
}

}

for (Map.Entry<String, List<String>> autoCorrelation: autoCorrelations.entrySet()) {
if (correlatedFindings.containsKey(autoCorrelation.getKey())) {
Expand All @@ -601,7 +633,8 @@ public void onResponse(MultiSearchResponse items) {
correlatedFindings.put(autoCorrelation.getKey(), autoCorrelation.getValue());
}
}
correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules);
List<String> correlationRuleIds = correlationRules.stream().map(it -> it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList());
correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRuleIds);
}

@Override
Expand All @@ -613,7 +646,8 @@ public void onFailure(Exception e) {
if (!autoCorrelations.isEmpty()) {
correlateFindingAction.getTimestampFeature(detectorType, autoCorrelations, null, List.of());
} else {
correlateFindingAction.getTimestampFeature(detectorType, null, request.getFinding(), correlationRules);
List<String> correlationRuleIds = correlationRules.stream().map(it -> it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList());
correlateFindingAction.getTimestampFeature(detectorType, null, request.getFinding(), correlationRuleIds);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class CorrelationRule implements Writeable, ToXContentObject {
public static final Long NO_VERSION = 1L;
private static final String CORRELATION_QUERIES = "correlate";
private static final String CORRELATION_TIME_WINDOW = "time_window";
private static final String TRIGGERS_FIELD = "triggers";

private String id;

Expand All @@ -40,16 +41,19 @@ public class CorrelationRule implements Writeable, ToXContentObject {

private Long corrTimeWindow;

public CorrelationRule(String id, Long version, String name, List<CorrelationQuery> correlationQueries, Long corrTimeWindow) {
private List<CorrelationRuleTrigger> triggers;

public CorrelationRule(String id, Long version, String name, List<CorrelationQuery> correlationQueries, Long corrTimeWindow, List<CorrelationRuleTrigger> triggers) {
this.id = id != null ? id : NO_ID;
this.version = version != null ? version : NO_VERSION;
this.name = name;
this.correlationQueries = correlationQueries;
this.corrTimeWindow = corrTimeWindow != null? corrTimeWindow: 300000L;
this.triggers = triggers;
}

public CorrelationRule(StreamInput sin) throws IOException {
this(sin.readString(), sin.readLong(), sin.readString(), sin.readList(CorrelationQuery::readFrom), sin.readLong());
this(sin.readString(), sin.readLong(), sin.readString(), sin.readList(CorrelationQuery::readFrom), sin.readLong(), sin.readList(CorrelationRuleTrigger::readFrom));
}

@Override
Expand All @@ -60,8 +64,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

CorrelationQuery[] correlationQueries = new CorrelationQuery[] {};
correlationQueries = this.correlationQueries.toArray(correlationQueries);
CorrelationRuleTrigger[] correlationRuleTriggers = new CorrelationRuleTrigger[] {};
correlationRuleTriggers = this.triggers.toArray(correlationRuleTriggers);
builder.field(CORRELATION_QUERIES, correlationQueries);
builder.field(CORRELATION_TIME_WINDOW, corrTimeWindow);
builder.field(TRIGGERS_FIELD, correlationRuleTriggers);
return builder.endObject();
}

Expand All @@ -74,6 +81,10 @@ public void writeTo(StreamOutput out) throws IOException {
for (CorrelationQuery query : correlationQueries) {
query.writeTo(out);
}

for (CorrelationRuleTrigger trigger : triggers) {
trigger.writeTo(out);
}
out.writeLong(corrTimeWindow);
}

Expand All @@ -88,6 +99,7 @@ public static CorrelationRule parse(XContentParser xcp, String id, Long version)
String name = null;
List<CorrelationQuery> correlationQueries = new ArrayList<>();
Long corrTimeWindow = null;
List<CorrelationRuleTrigger> triggers = new ArrayList<>();

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp);
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -108,11 +120,18 @@ public static CorrelationRule parse(XContentParser xcp, String id, Long version)
case CORRELATION_TIME_WINDOW:
corrTimeWindow = xcp.longValue();
break;
case TRIGGERS_FIELD:
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp);
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
CorrelationRuleTrigger trigger = CorrelationRuleTrigger.parse(xcp);
triggers.add(trigger);
}
break;
default:
xcp.skipChildren();
}
}
return new CorrelationRule(id, version, name, correlationQueries, corrTimeWindow);
return new CorrelationRule(id, version, name, correlationQueries, corrTimeWindow, triggers);
}

public static CorrelationRule readFrom(StreamInput sin) throws IOException {
Expand Down Expand Up @@ -151,6 +170,10 @@ public Long getCorrTimeWindow() {
return corrTimeWindow;
}

public List<CorrelationRuleTrigger> getCorrelationTriggers() {
return triggers;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -159,7 +182,8 @@ public boolean equals(Object o) {
return id.equals(that.id)
&& version.equals(that.version)
&& name.equals(that.name)
&& correlationQueries.equals(that.correlationQueries);
&& correlationQueries.equals(that.correlationQueries)
&& triggers.equals(that.triggers);
}

@Override
Expand Down
Loading

0 comments on commit da73e7e

Please sign in to comment.