diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java index 5685036f5..84447e7b5 100644 --- a/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java +++ b/src/main/java/org/opensearch/securityanalytics/correlation/JoinEngine.java @@ -9,12 +9,9 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.join.ScoreMode; import org.opensearch.OpenSearchStatusException; -import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.routing.Preference; import org.opensearch.common.unit.TimeValue; import org.opensearch.commons.alerting.model.DocLevelQuery; -import org.opensearch.commons.notifications.model.ChannelMessage; -import org.opensearch.commons.notifications.model.EventSource; import org.opensearch.core.action.ActionListener; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; @@ -35,6 +32,9 @@ import org.opensearch.search.SearchHit; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.securityanalytics.config.monitors.DetectorMonitorConfig; +import org.opensearch.securityanalytics.correlation.alert.CorrelationAlertService; +import org.opensearch.securityanalytics.correlation.alert.CorrelationRuleScheduler; +import org.opensearch.securityanalytics.correlation.alert.notifications.NotificationService; import org.opensearch.securityanalytics.logtype.LogTypeService; import org.opensearch.securityanalytics.model.CorrelationQuery; import org.opensearch.securityanalytics.model.CorrelationRule; @@ -42,9 +42,7 @@ import org.opensearch.securityanalytics.model.Detector; import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction; import org.opensearch.securityanalytics.util.AutoCorrelationsRepo; -import org.opensearch.securityanalytics.util.NotificationApiUtils; -import org.opensearch.securityanalytics.util.NotificationApiHelper; -import org.opensearch.commons.alerting.model.action.Action; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -353,10 +351,9 @@ private void getValidDocuments(String detectorType, List indices, List it.correlationRule).map(CorrelationRule::getId).collect(Collectors.toList()), + filteredCorrelationRules.stream().map(it -> it.correlationRule).collect(Collectors.toList()), autoCorrelations ); }, this::onFailure)); @@ -369,7 +366,7 @@ private void getValidDocuments(String detectorType, List indices, List> categoryToQueriesMap, Map categoryToTimeWindowMap, List correlationRules, Map> autoCorrelations) { + private void searchFindingsByTimestamp(String detectorType, Map> categoryToQueriesMap, Map categoryToTimeWindowMap, List correlationRules, Map> autoCorrelations) { long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli(); MultiSearchRequest mSearchRequest = new MultiSearchRequest(); List>> categoryToQueriesPairs = new ArrayList<>(); @@ -425,14 +422,14 @@ private void searchFindingsByTimestamp(String detectorType, Map relatedDocsMap, Map categoryToTimeWindowMap, List correlationRules, Map> autoCorrelations) { + private void searchDocsWithFilterKeys(String detectorType, Map relatedDocsMap, Map categoryToTimeWindowMap, List correlationRules, Map> autoCorrelations) { MultiSearchRequest mSearchRequest = new MultiSearchRequest(); List categories = new ArrayList<>(); @@ -483,7 +480,7 @@ private void searchDocsWithFilterKeys(String detectorType, Map> filteredRelatedDocIds, Map categoryToTimeWindowMap, List correlationRules, Map> autoCorrelations) { + private void getCorrelatedFindings(String detectorType, Map> filteredRelatedDocIds, Map categoryToTimeWindowMap, List correlationRules, Map> autoCorrelations) { long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli(); MultiSearchRequest mSearchRequest = new MultiSearchRequest(); List categories = new ArrayList<>(); @@ -540,31 +537,6 @@ private void getCorrelatedFindings(String detectorType, Map for (SearchHit hit : hits) { findings.add(hit.getId()); } - for (FilteredCorrelationRule corrRule: correlationRules) { - List triggers = corrRule.correlationRule.getCorrelationTriggers(); - List list=new ArrayList<>(); - - log.info("triggers are: {}", triggers.toString()); - for(CorrelationRuleTrigger trigger: triggers) { - String severity = trigger.getSeverity(); - List actions = trigger.getActions(); - log.info("trigger Actions are: {}", actions.toString()); - - for(Action action: actions) { - if (action.getDestinationId() != null && !action.getDestinationId().isEmpty()) { - log.info("Destination id is: {}", action.getDestinationId()); - list.add(action.getDestinationId()); - try { - log.info("Reaching here and calling: {}", action.getDestinationId()); - NotificationApiUtils.sendNotification((NodeClient) client, action.getDestinationId(), "", list); - } catch (IOException e) { - log.info("Exception is: {}", e); - } - } - } - } - - } if (!findings.isEmpty()) { correlatedFindings.put(categories.get(idx), findings); @@ -572,6 +544,13 @@ private void getCorrelatedFindings(String detectorType, Map ++idx; } + CorrelationRuleScheduler correlationRuleScheduler = new CorrelationRuleScheduler(); + correlationRuleScheduler.schedule(correlationRules, correlatedFindings, request.getFinding().getId()); + log.info("Source correlated findings: {}", request.getFinding().getId()); + log.info("Get correlated findings: {}", correlatedFindings); + log.info("Source correlated findings: {}", request.getFinding().getId()); + log.info("Index correlated findings: {}", idx); + for (Map.Entry> autoCorrelation: autoCorrelations.entrySet()) { if (correlatedFindings.containsKey(autoCorrelation.getKey())) { Set alreadyCorrelatedFindings = new HashSet<>(correlatedFindings.get(autoCorrelation.getKey())); @@ -581,10 +560,10 @@ private void getCorrelatedFindings(String detectorType, Map correlatedFindings.put(autoCorrelation.getKey(), autoCorrelation.getValue()); } } - correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules); + correlateFindingAction.initCorrelationIndex(detectorType, correlatedFindings, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList())); }, this::onFailure)); } else { - getTimestampFeature(detectorType, correlationRules, autoCorrelations); + getTimestampFeature(detectorType, correlationRules.stream().map(CorrelationRule::getId).collect(Collectors.toList()), autoCorrelations); } } diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationRuleScheduler.java b/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationRuleScheduler.java new file mode 100644 index 000000000..de4b6d166 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/correlation/alert/CorrelationRuleScheduler.java @@ -0,0 +1,87 @@ +package org.opensearch.securityanalytics.correlation.alert; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.securityanalytics.correlation.alert.notifications.NotificationService; +import org.opensearch.securityanalytics.model.CorrelationQuery; +import org.opensearch.securityanalytics.model.CorrelationRule; +import org.opensearch.securityanalytics.model.CorrelationRuleTrigger; + +import java.time.Instant; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class CorrelationRuleScheduler { + + private static final Logger log = LogManager.getLogger(CorrelationRuleScheduler.class); + + public void schedule(List correlationRules, Map> correlatedFindings, String sourceFinding) { + // Create a map of correlation rule to list of finding IDs + Map> correlationRuleToFindingIds = new HashMap<>(); + for (CorrelationRule rule : correlationRules) { + CorrelationRuleTrigger trigger = rule.getCorrelationTrigger(); + if (trigger != null) { + List findingIds = new ArrayList<>(); + for (CorrelationQuery query : rule.getCorrelationQueries()) { + List categoryFindingIds = correlatedFindings.get(query.getCategory()); + if (categoryFindingIds != null) { + findingIds.addAll(categoryFindingIds); + } + } + correlationRuleToFindingIds.put(rule, findingIds); + // Simulate generating matched correlation rule IDs on rolling time window basis + scheduleRule(rule, findingIds); + } + } + } + public void scheduleRule(CorrelationRule correlationRule, List findingIds) { + Timer timer = new Timer(); + long startTime = Instant.now().toEpochMilli(); + long endTime = startTime + TimeUnit.MINUTES.toMillis(correlationRule.getCorrTimeWindow()); // Assuming time window is based on ruleId +// timer.schedule(new RuleTask(this.correlationAlertService, this.notificationService, correlationRule, findingIds, startTime, endTime), 0, 60000); // Check every minute + } + + static class RuleTask extends TimerTask { + private final CorrelationAlertService alertService; + private final NotificationService notificationService; + private final CorrelationRule correlationRule; + private final long startTime; + private final long endTime; + private final List correlatedFindingIds; + + + public RuleTask(CorrelationAlertService alertService, NotificationService notificationService, CorrelationRule correlationRule, List correlatedFindingIds, long startTime, long endTime) { + this.alertService = alertService; + this.notificationService = notificationService; + this.startTime = startTime; + this.endTime = endTime; + this.correlatedFindingIds = correlatedFindingIds; + this.correlationRule = correlationRule; + } + + @Override + public void run() { + long currentTime = Instant.now().toEpochMilli(); +// if (currentTime >= startTime && currentTime <= endTime) { // Within time window +// try { +// List activeAlertIds = alertService.getActiveAlertsList(correlationRule.getId(), startTime, endTime); +// if (activeAlertIds.isEmpty()) { +// Map correlationAlert = Map.of( +// "start_time", startTime, +// "end_time", endTime, +// "correlation_rule_id", correlationRule.getId(), +// "severity", correlationRule.getCorrelationTrigger().getSeverity() +// // add more fields; +// ); +// alertService.indexAlert(correlationAlert); +// //notificationService.sendNotification(alert); +// } else { +// alertService.updateActiveAlerts(activeAlertIds); +// } +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// } + } + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/alert/notifications/NotificationService.java b/src/main/java/org/opensearch/securityanalytics/correlation/alert/notifications/NotificationService.java new file mode 100644 index 000000000..e19d39941 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/correlation/alert/notifications/NotificationService.java @@ -0,0 +1,4 @@ +package org.opensearch.securityanalytics.correlation.alert.notifications; + +public class NotificationService { +} diff --git a/src/main/java/org/opensearch/securityanalytics/model/CorrelationRule.java b/src/main/java/org/opensearch/securityanalytics/model/CorrelationRule.java index 9f50594c9..c4a1d4e2c 100644 --- a/src/main/java/org/opensearch/securityanalytics/model/CorrelationRule.java +++ b/src/main/java/org/opensearch/securityanalytics/model/CorrelationRule.java @@ -10,6 +10,7 @@ import java.util.Objects; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.commons.authuser.User; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -29,7 +30,7 @@ public class CorrelationRule implements Writeable, ToXContentObject { public static final Long NO_VERSION = 1L; private static final String CORRELATION_QUERIES = "correlate"; private static final String CORRELATION_TIME_WINDOW = "time_window"; - private static final String TRIGGERS_FIELD = "triggers"; + private static final String TRIGGER_FIELD = "trigger"; private String id; @@ -41,19 +42,19 @@ public class CorrelationRule implements Writeable, ToXContentObject { private Long corrTimeWindow; - private List triggers; + private CorrelationRuleTrigger trigger; - public CorrelationRule(String id, Long version, String name, List correlationQueries, Long corrTimeWindow, List triggers) { + public CorrelationRule(String id, Long version, String name, List correlationQueries, Long corrTimeWindow, CorrelationRuleTrigger trigger) { this.id = id != null ? id : NO_ID; this.version = version != null ? version : NO_VERSION; this.name = name; this.correlationQueries = correlationQueries; this.corrTimeWindow = corrTimeWindow != null? corrTimeWindow: 300000L; - this.triggers = triggers; + this.trigger = trigger; } public CorrelationRule(StreamInput sin) throws IOException { - this(sin.readString(), sin.readLong(), sin.readString(), sin.readList(CorrelationQuery::readFrom), sin.readLong(), sin.readList(CorrelationRuleTrigger::readFrom)); + this(sin.readString(), sin.readLong(), sin.readString(), sin.readList(CorrelationQuery::readFrom), sin.readLong(), sin.readBoolean() ? new CorrelationRuleTrigger(sin) : null); } @Override @@ -64,11 +65,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws CorrelationQuery[] correlationQueries = new CorrelationQuery[] {}; correlationQueries = this.correlationQueries.toArray(correlationQueries); - CorrelationRuleTrigger[] correlationRuleTriggers = new CorrelationRuleTrigger[] {}; - correlationRuleTriggers = this.triggers.toArray(correlationRuleTriggers); builder.field(CORRELATION_QUERIES, correlationQueries); builder.field(CORRELATION_TIME_WINDOW, corrTimeWindow); - builder.field(TRIGGERS_FIELD, correlationRuleTriggers); + builder.field(TRIGGER_FIELD, trigger); return builder.endObject(); } @@ -82,7 +81,8 @@ public void writeTo(StreamOutput out) throws IOException { query.writeTo(out); } - for (CorrelationRuleTrigger trigger : triggers) { + out.writeBoolean(trigger != null); + if (trigger != null) { trigger.writeTo(out); } out.writeLong(corrTimeWindow); @@ -99,8 +99,7 @@ public static CorrelationRule parse(XContentParser xcp, String id, Long version) String name = null; List correlationQueries = new ArrayList<>(); Long corrTimeWindow = null; - List triggers = new ArrayList<>(); - + CorrelationRuleTrigger trigger = null; XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp); while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { String fieldName = xcp.currentName(); @@ -120,18 +119,18 @@ public static CorrelationRule parse(XContentParser xcp, String id, Long version) case CORRELATION_TIME_WINDOW: corrTimeWindow = xcp.longValue(); break; - case TRIGGERS_FIELD: - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); - while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { - CorrelationRuleTrigger trigger = CorrelationRuleTrigger.parse(xcp); - triggers.add(trigger); + case TRIGGER_FIELD: + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + trigger = null; + } else { + trigger = CorrelationRuleTrigger.parse(xcp); } break; default: xcp.skipChildren(); } } - return new CorrelationRule(id, version, name, correlationQueries, corrTimeWindow, triggers); + return new CorrelationRule(id, version, name, correlationQueries, corrTimeWindow, trigger); } public static CorrelationRule readFrom(StreamInput sin) throws IOException { @@ -170,8 +169,8 @@ public Long getCorrTimeWindow() { return corrTimeWindow; } - public List getCorrelationTriggers() { - return triggers; + public CorrelationRuleTrigger getCorrelationTrigger() { + return trigger; } @Override @@ -183,7 +182,7 @@ public boolean equals(Object o) { && version.equals(that.version) && name.equals(that.name) && correlationQueries.equals(that.correlationQueries) - && triggers.equals(that.triggers); + && trigger.equals(that.trigger); } @Override diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java index 910794556..130206e54 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java @@ -146,7 +146,6 @@ protected void doExecute(Task task, ActionRequest request, ActionListener { @@ -168,6 +167,19 @@ protected void doExecute(Task task, ActionRequest request, ActionListener { + if (createIndexResponse.isAcknowledged()) { + IndexUtils.correlationMetadataIndexUpdated(); + } else { + correlateFindingAction.onFailures(new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR)); + } + }, correlateFindingAction::onFailures)); + } catch (Exception ex) { + correlateFindingAction.onFailures(ex); + } + } } else { correlateFindingAction.onFailures(new OpenSearchStatusException("Failed to create correlation Index", RestStatus.INTERNAL_SERVER_ERROR)); } diff --git a/src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java b/src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java index 624d76d58..375342d09 100644 --- a/src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java +++ b/src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java @@ -36,6 +36,8 @@ public class CorrelationIndices { public static final String CORRELATION_HISTORY_INDEX_PATTERN_REGEXP = ".opensearch-sap-correlation-history*"; public static final String CORRELATION_HISTORY_WRITE_INDEX = ".opensearch-sap-correlation-history-write"; + + public static final String CORRELATION_ALERT_INDEX = ".opensearch-sap-correlation-alerts"; public static final long FIXED_HISTORICAL_INTERVAL = 24L * 60L * 60L * 20L * 1000L; private final Client client; @@ -84,6 +86,11 @@ public boolean correlationMetadataIndexExists() { return clusterState.metadata().hasIndex(CORRELATION_METADATA_INDEX); } + public boolean correlationAlertIndexExists() { + ClusterState clusterState = clusterService.state(); + return clusterState.metadata().hasIndex(CORRELATION_ALERT_INDEX); + } + public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, ActionListener listener) throws IOException { try { long currentTimestamp = System.currentTimeMillis(); @@ -122,4 +129,17 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A throw ex; } } + + public static String correlationAlertIndexMappings() throws IOException { + return new String(Objects.requireNonNull(CorrelationIndices.class.getClassLoader().getResourceAsStream("mappings/correlation_alert_mapping.json")).readAllBytes(), Charset.defaultCharset()); + } + public void initCorrelationAlertIndex(ActionListener actionListener) throws IOException { + Settings correlationAlertSettings = Settings.builder() + .put("index.hidden", true) + .build(); + CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_ALERT_INDEX) + .mapping(correlationAlertIndexMappings()) + .settings(correlationAlertSettings); + client.admin().indices().create(indexRequest, actionListener); + } } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java b/src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java index ce358591e..a24286fda 100644 --- a/src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java +++ b/src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java @@ -45,6 +45,8 @@ public class IndexUtils { public static String lastUpdatedCorrelationHistoryIndex = null; public static Boolean correlationRuleIndexUpdated = false; + public static Boolean correlationAlertIndexUpdated = false; + public static Boolean customLogTypeIndexUpdated = false; public static void detectorIndexUpdated() { @@ -65,6 +67,10 @@ public static void correlationMetadataIndexUpdated() { correlationMetadataIndexUpdated = true; } + public static void correlationAlertIndexUpdated() { + correlationAlertIndexUpdated = true; + } + public static void correlationRuleIndexUpdated() { correlationRuleIndexUpdated = true; } diff --git a/src/main/resources/mappings/correlation_alert_mapping.json b/src/main/resources/mappings/correlation_alert_mapping.json index 2178e4708..2a7acabda 100644 --- a/src/main/resources/mappings/correlation_alert_mapping.json +++ b/src/main/resources/mappings/correlation_alert_mapping.json @@ -1,102 +1,105 @@ { - "mappings": { - "dynamic": "strict", - "_meta": { - "schema_version": 1 + "_meta": { + "schema_version": 1 + }, + "properties": { + "acknowledged_time": { + "type": "date" }, - "properties": { - "acknowledged_time": { - "type": "date" - }, - "action_execution_results": { - "type": "nested", - "properties": { - "action_id": { - "type": "keyword" - }, - "last_execution_time": { - "type": "date" - }, - "throttled_count": { - "type": "integer" - } + "action_execution_results": { + "type": "nested", + "properties": { + "action_id": { + "type": "keyword" + }, + "last_execution_time": { + "type": "date" + }, + "throttled_count": { + "type": "integer" } - }, - "trigger_time": { - "type": "date" - }, - "error_message": { - "type": "text" - }, - "correlated_finding_ids": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword" - } + } + }, + "trigger_time": { + "type": "date" + }, + "error_message": { + "type": "text" + }, + "correlated_finding_ids": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" } - }, - "correlation_rule_id": { - "type": "keyword" - }, - "correlation_rule_name": { - "type": "text" - }, - "user": { - "properties": { - "backend_roles": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword" - } + } + }, + "correlation_rule_id": { + "type": "keyword" + }, + "correlation_rule_name": { + "type": "text" + }, + "user": { + "properties": { + "backend_roles": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" } - }, - "custom_attribute_names": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword" - } + } + }, + "custom_attribute_names": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" } - }, - "name": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword", - "ignore_above": 256 - } + } + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 } - }, - "roles": { - "type": "text", - "fields": { - "keyword": { - "type": "keyword" - } + } + }, + "roles": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" } } } - }, - "schema_version": { - "type": "integer" - }, - "severity": { - "type": "keyword" - }, - "state": { - "type": "keyword" - }, - "id": { - "type": "keyword" - }, - "trigger_name": { - "type": "text" - }, - "version": { - "type": "long" } + }, + "schema_version": { + "type": "integer" + }, + "severity": { + "type": "keyword" + }, + "state": { + "type": "keyword" + }, + "id": { + "type": "keyword" + }, + "trigger_name": { + "type": "text" + }, + "version": { + "type": "long" + }, + "start_time": { + "type": "date" + }, + "end_time": { + "type": "date" } } } \ No newline at end of file