diff --git a/plugins/events-correlation-engine/src/internalClusterTest/java/org/opensearch/plugin/correlation/EventsCorrelationPluginTransportIT.java b/plugins/events-correlation-engine/src/internalClusterTest/java/org/opensearch/plugin/correlation/EventsCorrelationPluginTransportIT.java index 6eac0e59fa2ed..75cf38fc10ce0 100644 --- a/plugins/events-correlation-engine/src/internalClusterTest/java/org/opensearch/plugin/correlation/EventsCorrelationPluginTransportIT.java +++ b/plugins/events-correlation-engine/src/internalClusterTest/java/org/opensearch/plugin/correlation/EventsCorrelationPluginTransportIT.java @@ -14,10 +14,20 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.NestedQueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.correlation.events.action.IndexCorrelationAction; +import org.opensearch.plugin.correlation.events.action.IndexCorrelationRequest; +import org.opensearch.plugin.correlation.events.action.IndexCorrelationResponse; +import org.opensearch.plugin.correlation.events.model.Correlation; import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction; import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleRequest; import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleResponse; @@ -34,6 +44,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -174,4 +186,693 @@ public void testCreatingACorrelationRuleWithNoTimestampField() throws Exception .get("timestampField") ); } + + public void testEventOnIndexWithNoRules() throws ExecutionException, InterruptedException { + String windowsIndex = "windows"; + CreateIndexRequest windowsRequest = new CreateIndexRequest(windowsIndex) + .mapping(windowsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(windowsRequest).get(); + + String appLogsIndex = "app_logs"; + CreateIndexRequest appLogsRequest = new CreateIndexRequest(appLogsIndex) + .mapping(appLogsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(appLogsRequest).get(); + + List correlationQueries = List.of( + new CorrelationQuery("app_logs", "endpoint:\\/customer_records.txt", "timestamp", List.of()) + ); + CorrelationRule correlationRule = new CorrelationRule("windows to app logs", correlationQueries); + IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(correlationRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, request).get(); + + IndexRequest indexRequestWindows = new IndexRequest("windows") + .source(sampleWindowsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse response = client().index(indexRequestWindows).get(); + String eventId = response.getId(); + + IndexCorrelationRequest correlationRequest = new IndexCorrelationRequest("windows", eventId, false); + IndexCorrelationResponse correlationResponse = client().execute(IndexCorrelationAction.INSTANCE, correlationRequest).get(); + + Assert.assertEquals(200, correlationResponse.getStatus().getStatus()); + Assert.assertTrue(correlationResponse.getOrphan()); + Assert.assertEquals(0, correlationResponse.getNeighborEvents().size()); + } + + public void testEventOnIndexWithNoMatchingRules() throws ExecutionException, InterruptedException { + String windowsIndex = "windows"; + CreateIndexRequest windowsRequest = new CreateIndexRequest(windowsIndex) + .mapping(windowsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(windowsRequest).get(); + + String appLogsIndex = "app_logs"; + CreateIndexRequest appLogsRequest = new CreateIndexRequest(appLogsIndex) + .mapping(appLogsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(appLogsRequest).get(); + + List correlationQueries = Arrays.asList( + new CorrelationQuery("windows", "host.hostname:EC2BMAZ*", "winlog.timestamp", List.of()) + ); + CorrelationRule correlationRule = new CorrelationRule("windows to app logs", correlationQueries); + IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(correlationRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, request).get(); + + IndexRequest indexRequestWindows = new IndexRequest("windows") + .source(sampleWindowsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse response = client().index(indexRequestWindows).get(); + String eventId = response.getId(); + + IndexCorrelationRequest correlationRequest = new IndexCorrelationRequest("windows", eventId, false); + IndexCorrelationResponse correlationResponse = client().execute(IndexCorrelationAction.INSTANCE, correlationRequest).get(); + + Assert.assertEquals(200, correlationResponse.getStatus().getStatus()); + Assert.assertTrue(correlationResponse.getOrphan()); + Assert.assertEquals(0, correlationResponse.getNeighborEvents().size()); + } + + public void testCorrelationWithSingleRule() throws ExecutionException, InterruptedException { + String windowsIndex = "windows"; + CreateIndexRequest windowsRequest = new CreateIndexRequest(windowsIndex) + .mapping(windowsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(windowsRequest).get(); + + String appLogsIndex = "app_logs"; + CreateIndexRequest appLogsRequest = new CreateIndexRequest(appLogsIndex) + .mapping(appLogsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(appLogsRequest).get(); + + List correlationQueries = Arrays.asList( + new CorrelationQuery("windows", "host.hostname:EC2AMAZ*", "winlog.timestamp", List.of()), + new CorrelationQuery("app_logs", "endpoint:\\/customer_records.txt", "timestamp", List.of()) + ); + CorrelationRule correlationRule = new CorrelationRule("windows to app logs", correlationQueries); + IndexCorrelationRuleRequest request = new IndexCorrelationRuleRequest(correlationRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, request).get(); + + IndexRequest indexRequestWindows = new IndexRequest("windows") + .source(sampleWindowsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(indexRequestWindows).get(); + + IndexRequest indexRequestAppLogs = new IndexRequest("app_logs") + .source(sampleAppLogsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse response = client().index(indexRequestAppLogs).get(); + String eventId = response.getId(); + + IndexCorrelationRequest correlationRequest = new IndexCorrelationRequest("app_logs", eventId, false); + IndexCorrelationResponse correlationResponse = client().execute(IndexCorrelationAction.INSTANCE, correlationRequest).get(); + + Assert.assertEquals(200, correlationResponse.getStatus().getStatus()); + Assert.assertEquals(1, correlationResponse.getNeighborEvents().size()); + Assert.assertFalse(correlationResponse.getOrphan()); + } + + public void testCorrelationWithMultipleRule() throws ExecutionException, InterruptedException { + String windowsIndex = "windows"; + CreateIndexRequest windowsRequest = new CreateIndexRequest(windowsIndex) + .mapping(windowsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(windowsRequest).get(); + + String appLogsIndex = "app_logs"; + CreateIndexRequest appLogsRequest = new CreateIndexRequest(appLogsIndex) + .mapping(appLogsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(appLogsRequest).get(); + + List correlationQueries1 = Arrays.asList( + new CorrelationQuery("windows", "host.hostname:EC2AMAZ*", "winlog.timestamp", List.of()), + new CorrelationQuery("app_logs", "endpoint:\\/customer_records.txt", "timestamp", List.of()) + ); + CorrelationRule correlationRule1 = new CorrelationRule("windows to app logs", correlationQueries1); + IndexCorrelationRuleRequest request1 = new IndexCorrelationRuleRequest(correlationRule1, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, request1).get(); + + List correlationQueries2 = Arrays.asList( + new CorrelationQuery("windows", "host.hostname:EC2BMAZ*", "winlog.timestamp", List.of()), + new CorrelationQuery("app_logs", "endpoint:\\/customer_records1.txt", "timestamp", List.of()) + ); + CorrelationRule correlationRule2 = new CorrelationRule("windows to app logs", correlationQueries2); + IndexCorrelationRuleRequest request2 = new IndexCorrelationRuleRequest(correlationRule2, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, request2).get(); + + IndexRequest indexRequestWindows = new IndexRequest("windows") + .source(sampleWindowsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(indexRequestWindows).get(); + + IndexRequest indexRequestAppLogs = new IndexRequest("app_logs") + .source(sampleAppLogsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse response = client().index(indexRequestAppLogs).get(); + String eventId = response.getId(); + + IndexCorrelationRequest correlationRequest = new IndexCorrelationRequest("app_logs", eventId, false); + IndexCorrelationResponse correlationResponse = client().execute(IndexCorrelationAction.INSTANCE, correlationRequest).get(); + + Assert.assertEquals(200, correlationResponse.getStatus().getStatus()); + Assert.assertFalse(correlationResponse.getOrphan()); + Assert.assertEquals(1, correlationResponse.getNeighborEvents().size()); + } + + public void testStoringCorrelationWithMultipleRule() throws ExecutionException, InterruptedException { + String networkIndex = "vpc_flow"; + CreateIndexRequest networkRequest = new CreateIndexRequest(networkIndex) + .mapping(networkMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(networkRequest).get(); + + String adLdapIndex = "ad_logs"; + CreateIndexRequest adLdapRequest = new CreateIndexRequest(adLdapIndex) + .mapping(adLdapMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(adLdapRequest).get(); + + String windowsIndex = "windows"; + CreateIndexRequest windowsRequest = new CreateIndexRequest(windowsIndex) + .mapping(windowsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(windowsRequest).get(); + + String appLogsIndex = "app_logs"; + CreateIndexRequest appLogsRequest = new CreateIndexRequest(appLogsIndex) + .mapping(appLogsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(appLogsRequest).get(); + + String s3AccessLogsIndex = "s3_access_logs"; + CreateIndexRequest s3AccessLogsRequest = new CreateIndexRequest(s3AccessLogsIndex) + .mapping(s3AccessLogsMapping()).settings(Settings.EMPTY); + + client().admin().indices().create(s3AccessLogsRequest).get(); + + List windowsAppLogsQuery = Arrays.asList( + new CorrelationQuery("windows", "host.hostname:EC2AMAZ*", "winlog.timestamp", List.of()), + new CorrelationQuery("app_logs", "endpoint:\\/customer_records.txt", "timestamp", List.of()) + ); + CorrelationRule windowsAppLogsRule = new CorrelationRule("windows to app logs", windowsAppLogsQuery); + IndexCorrelationRuleRequest windowsAppLogsRequest = new IndexCorrelationRuleRequest(windowsAppLogsRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, windowsAppLogsRequest).get(); + + List networkWindowsAdLdapQuery = Arrays.asList( + new CorrelationQuery("vpc_flow", "dstaddr:4.5.6.7 or dstaddr:4.5.6.6", "timestamp", List.of()), + new CorrelationQuery("windows", "winlog.event_data.SubjectDomainName:NTAUTHORI*", "winlog.timestamp", List.of()), + new CorrelationQuery("ad_logs", "ResultType:50126", "timestamp", List.of()) + ); + CorrelationRule networkWindowsAdLdapRule = new CorrelationRule("netowrk to windows to ad/ldap", networkWindowsAdLdapQuery); + IndexCorrelationRuleRequest networkWindowsAdLdapRequest = new IndexCorrelationRuleRequest(networkWindowsAdLdapRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, networkWindowsAdLdapRequest).get(); + + List s3AppLogsQuery = Arrays.asList( + new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", "timestamp", List.of()), + new CorrelationQuery("app_logs", "keywords:PermissionDenied", "timestamp", List.of()) + ); + CorrelationRule s3AppLogsRule = new CorrelationRule("s3 to app logs", s3AppLogsQuery); + IndexCorrelationRuleRequest s3AppLogsRequest = new IndexCorrelationRuleRequest(s3AppLogsRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, s3AppLogsRequest).get(); + + IndexRequest indexRequestNetwork = new IndexRequest("vpc_flow") + .source(sampleNetworkEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseNetwork = client().index(indexRequestNetwork).get(); + + String networkEventId = indexResponseNetwork.getId(); + IndexCorrelationRequest networkCorrelationRequest = new IndexCorrelationRequest("vpc_flow", networkEventId, true); + IndexCorrelationResponse networkCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, networkCorrelationRequest).get(); + Assert.assertEquals(200, networkCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(true, networkCorrelationResponse.getOrphan()); + + IndexRequest indexRequestAdLdap = new IndexRequest("ad_logs") + .source(sampleAdLdapEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseAdLdap = client().index(indexRequestAdLdap).get(); + + String adLdapEventId = indexResponseAdLdap.getId(); + IndexCorrelationRequest adLdapCorrelationRequest = new IndexCorrelationRequest("ad_logs", adLdapEventId, true); + IndexCorrelationResponse adLdapCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, adLdapCorrelationRequest).get(); + Assert.assertEquals(200, adLdapCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, adLdapCorrelationResponse.getOrphan()); + Assert.assertEquals(1, adLdapCorrelationResponse.getNeighborEvents().size()); + + IndexRequest indexRequestWindows = new IndexRequest("windows") + .source(sampleWindowsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseWindows = client().index(indexRequestWindows).get(); + + String windowsEventId = indexResponseWindows.getId(); + IndexCorrelationRequest windowsCorrelationRequest = new IndexCorrelationRequest("windows", windowsEventId, true); + IndexCorrelationResponse windowsCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, windowsCorrelationRequest).get(); + Assert.assertEquals(200, windowsCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, windowsCorrelationResponse.getOrphan()); + Assert.assertEquals(2, windowsCorrelationResponse.getNeighborEvents().size()); + + IndexRequest indexRequestAppLogs = new IndexRequest("app_logs") + .source(sampleAppLogsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseAppLogs = client().index(indexRequestAppLogs).get(); + + String appLogsEventId = indexResponseAppLogs.getId(); + IndexCorrelationRequest appLogsCorrelationRequest = new IndexCorrelationRequest("app_logs", appLogsEventId, true); + IndexCorrelationResponse appLogsCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, appLogsCorrelationRequest).get(); + Assert.assertEquals(200, appLogsCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, appLogsCorrelationResponse.getOrphan()); + Assert.assertEquals(1, appLogsCorrelationResponse.getNeighborEvents().size()); + + IndexRequest indexRequestS3Logs = new IndexRequest("s3_access_logs") + .source(sampleS3AccessEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseS3 = client().index(indexRequestS3Logs).get(); + + String s3EventId = indexResponseS3.getId(); + IndexCorrelationRequest s3CorrelationRequest = new IndexCorrelationRequest("s3_access_logs", s3EventId, true); + IndexCorrelationResponse s3CorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, s3CorrelationRequest).get(); + Assert.assertEquals(200, s3CorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, s3CorrelationResponse.getOrphan()); + Assert.assertEquals(1, s3CorrelationResponse.getNeighborEvents().size()); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(100); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + SearchResponse searchResponse = client().search(searchRequest).get(); + Assert.assertEquals(12L, searchResponse.getHits().getTotalHits().value); + } + + public void testStoringCorrelationWithMultipleLevels() throws ExecutionException, InterruptedException { + String networkIndex = "vpc_flow"; + CreateIndexRequest networkRequest = new CreateIndexRequest(networkIndex) + .mapping(networkMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(networkRequest).get(); + + String adLdapIndex = "ad_logs"; + CreateIndexRequest adLdapRequest = new CreateIndexRequest(adLdapIndex) + .mapping(adLdapMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(adLdapRequest).get(); + + String windowsIndex = "windows"; + CreateIndexRequest windowsRequest = new CreateIndexRequest(windowsIndex) + .mapping(windowsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(windowsRequest).get(); + + String appLogsIndex = "app_logs"; + CreateIndexRequest appLogsRequest = new CreateIndexRequest(appLogsIndex) + .mapping(appLogsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(appLogsRequest).get(); + + List networkWindowsAdLdapQuery = Arrays.asList( + new CorrelationQuery("vpc_flow", "dstaddr:4.5.6.7 or dstaddr:4.5.6.6", "timestamp", List.of()), + new CorrelationQuery("windows", "winlog.event_data.SubjectDomainName:NTAUTHORI*", "winlog.timestamp", List.of()), + new CorrelationQuery("ad_logs", "ResultType:50126", "timestamp", List.of()) + ); + CorrelationRule networkWindowsAdLdapRule = new CorrelationRule("netowrk to windows to ad/ldap", networkWindowsAdLdapQuery); + IndexCorrelationRuleRequest networkWindowsAdLdapRequest = new IndexCorrelationRuleRequest(networkWindowsAdLdapRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, networkWindowsAdLdapRequest).get(); + + IndexRequest indexRequestNetwork = new IndexRequest("vpc_flow") + .source(sampleNetworkEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseNetwork = client().index(indexRequestNetwork).get(); + + String networkEventId = indexResponseNetwork.getId(); + IndexCorrelationRequest networkCorrelationRequest = new IndexCorrelationRequest("vpc_flow", networkEventId, true); + IndexCorrelationResponse networkCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, networkCorrelationRequest).get(); + Assert.assertEquals(200, networkCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(true, networkCorrelationResponse.getOrphan()); + + IndexRequest indexRequestAdLdap = new IndexRequest("ad_logs") + .source(sampleAdLdapEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseAdLdap = client().index(indexRequestAdLdap).get(); + + String adLdapEventId = indexResponseAdLdap.getId(); + IndexCorrelationRequest adLdapCorrelationRequest = new IndexCorrelationRequest("ad_logs", adLdapEventId, true); + IndexCorrelationResponse adLdapCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, adLdapCorrelationRequest).get(); + Assert.assertEquals(200, adLdapCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, adLdapCorrelationResponse.getOrphan()); + Assert.assertEquals(1, adLdapCorrelationResponse.getNeighborEvents().size()); + + IndexRequest indexRequestWindows = new IndexRequest("windows") + .source(sampleWindowsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseWindows = client().index(indexRequestWindows).get(); + + String windowsEventId = indexResponseWindows.getId(); + IndexCorrelationRequest windowsCorrelationRequest = new IndexCorrelationRequest("windows", windowsEventId, true); + IndexCorrelationResponse windowsCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, windowsCorrelationRequest).get(); + Assert.assertEquals(200, windowsCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, windowsCorrelationResponse.getOrphan()); + Assert.assertEquals(2, windowsCorrelationResponse.getNeighborEvents().size()); + + IndexRequest indexRequestAppLogs = new IndexRequest("app_logs") + .source(sampleAppLogsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseAppLogs = client().index(indexRequestAppLogs).get(); + + String appLogsEventId = indexResponseAppLogs.getId(); + IndexCorrelationRequest appLogsCorrelationRequest = new IndexCorrelationRequest("app_logs", appLogsEventId, true); + IndexCorrelationResponse appLogsCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, appLogsCorrelationRequest).get(); + Assert.assertEquals(200, appLogsCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(true, appLogsCorrelationResponse.getOrphan()); + Assert.assertEquals(0, appLogsCorrelationResponse.getNeighborEvents().size()); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.matchQuery("index1", "app_logs")); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(100); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + SearchResponse searchResponse = client().search(searchRequest).get(); + Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value); + Assert.assertEquals(100, Objects.requireNonNull(searchResponse.getHits().getHits()[0].getSourceAsMap()).get("level")); + } + + public void testStoringCorrelationWithMultipleLevelsWithSeparateGroups() throws ExecutionException, InterruptedException { + String networkIndex = "vpc_flow"; + CreateIndexRequest networkRequest = new CreateIndexRequest(networkIndex) + .mapping(networkMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(networkRequest).get(); + + String adLdapIndex = "ad_logs"; + CreateIndexRequest adLdapRequest = new CreateIndexRequest(adLdapIndex) + .mapping(adLdapMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(adLdapRequest).get(); + + String windowsIndex = "windows"; + CreateIndexRequest windowsRequest = new CreateIndexRequest(windowsIndex) + .mapping(windowsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(windowsRequest).get(); + + String appLogsIndex = "app_logs"; + CreateIndexRequest appLogsRequest = new CreateIndexRequest(appLogsIndex) + .mapping(appLogsMappings()).settings(Settings.EMPTY); + + client().admin().indices().create(appLogsRequest).get(); + + List networkWindowsAdLdapQuery = Arrays.asList( + new CorrelationQuery("vpc_flow", "dstaddr:4.5.6.7 or dstaddr:4.5.6.6", "timestamp", List.of()), + new CorrelationQuery("windows", "winlog.event_data.SubjectDomainName:NTAUTHORI*", "winlog.timestamp", List.of()), + new CorrelationQuery("ad_logs", "ResultType:50126", "timestamp", List.of()) + ); + CorrelationRule networkWindowsAdLdapRule = new CorrelationRule("netowrk to windows to ad/ldap", networkWindowsAdLdapQuery); + IndexCorrelationRuleRequest networkWindowsAdLdapRequest = new IndexCorrelationRuleRequest(networkWindowsAdLdapRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, networkWindowsAdLdapRequest).get(); + + List s3AppLogsQuery = Arrays.asList( + new CorrelationQuery("s3_access_logs", "aws.cloudtrail.eventName:ReplicateObject", "timestamp", List.of()), + new CorrelationQuery("app_logs", "keywords:PermissionDenied", "timestamp", List.of()) + ); + CorrelationRule s3AppLogsRule = new CorrelationRule("s3 to app logs", s3AppLogsQuery); + IndexCorrelationRuleRequest s3AppLogsRequest = new IndexCorrelationRuleRequest(s3AppLogsRule, RestRequest.Method.POST); + client().execute(IndexCorrelationRuleAction.INSTANCE, s3AppLogsRequest).get(); + + IndexRequest indexRequestNetwork = new IndexRequest("vpc_flow") + .source(sampleNetworkEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseNetwork = client().index(indexRequestNetwork).get(); + + String networkEventId = indexResponseNetwork.getId(); + IndexCorrelationRequest networkCorrelationRequest = new IndexCorrelationRequest("vpc_flow", networkEventId, true); + IndexCorrelationResponse networkCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, networkCorrelationRequest).get(); + Assert.assertEquals(200, networkCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(true, networkCorrelationResponse.getOrphan()); + + IndexRequest indexRequestAdLdap = new IndexRequest("ad_logs") + .source(sampleAdLdapEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseAdLdap = client().index(indexRequestAdLdap).get(); + + String adLdapEventId = indexResponseAdLdap.getId(); + IndexCorrelationRequest adLdapCorrelationRequest = new IndexCorrelationRequest("ad_logs", adLdapEventId, true); + IndexCorrelationResponse adLdapCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, adLdapCorrelationRequest).get(); + Assert.assertEquals(200, adLdapCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, adLdapCorrelationResponse.getOrphan()); + Assert.assertEquals(1, adLdapCorrelationResponse.getNeighborEvents().size()); + + IndexRequest indexRequestWindows = new IndexRequest("windows") + .source(sampleWindowsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseWindows = client().index(indexRequestWindows).get(); + + String windowsEventId = indexResponseWindows.getId(); + IndexCorrelationRequest windowsCorrelationRequest = new IndexCorrelationRequest("windows", windowsEventId, true); + IndexCorrelationResponse windowsCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, windowsCorrelationRequest).get(); + Assert.assertEquals(200, windowsCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, windowsCorrelationResponse.getOrphan()); + Assert.assertEquals(2, windowsCorrelationResponse.getNeighborEvents().size()); + + IndexRequest indexRequestAppLogs = new IndexRequest("app_logs") + .source(sampleAppLogsEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseAppLogs = client().index(indexRequestAppLogs).get(); + + String appLogsEventId = indexResponseAppLogs.getId(); + IndexCorrelationRequest appLogsCorrelationRequest = new IndexCorrelationRequest("app_logs", appLogsEventId, true); + IndexCorrelationResponse appLogsCorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, appLogsCorrelationRequest).get(); + Assert.assertEquals(200, appLogsCorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(true, appLogsCorrelationResponse.getOrphan()); + + IndexRequest indexRequestS3Logs = new IndexRequest("s3_access_logs") + .source(sampleS3AccessEvent(), XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponseS3 = client().index(indexRequestS3Logs).get(); + + String s3EventId = indexResponseS3.getId(); + IndexCorrelationRequest s3CorrelationRequest = new IndexCorrelationRequest("s3_access_logs", s3EventId, true); + IndexCorrelationResponse s3CorrelationResponse = client().execute(IndexCorrelationAction.INSTANCE, s3CorrelationRequest).get(); + Assert.assertEquals(200, s3CorrelationResponse.getStatus().getStatus()); + Assert.assertEquals(false, s3CorrelationResponse.getOrphan()); + Assert.assertEquals(1, s3CorrelationResponse.getNeighborEvents().size()); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.boolQuery() + .must(QueryBuilders.matchQuery("index2", "app_logs")) + .must(QueryBuilders.matchQuery("index1", "s3_access_logs"))); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(100); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + SearchResponse searchResponse = client().search(searchRequest).get(); + Assert.assertEquals(1L, searchResponse.getHits().getTotalHits().value); + Assert.assertEquals(75, Objects.requireNonNull(searchResponse.getHits().getHits()[0].getSourceAsMap()).get("level")); + } + + private String networkMappings() { + return "\"properties\": {\n" + + " \"version\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"account-id\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"interface-id\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"srcaddr\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"dstaddr\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"srcport\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"dstport\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"severity_id\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"class_name\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"timestamp\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " }"; + } + + private String sampleNetworkEvent() { + return "{\n" + + " \"version\": 1,\n" + + " \"account-id\": \"A12345\",\n" + + " \"interface-id\": \"I12345\",\n" + + " \"srcaddr\": \"1.2.3.4\",\n" + + " \"dstaddr\": \"4.5.6.7\",\n" + + " \"srcport\": 9000,\n" + + " \"dstport\": 8000,\n" + + " \"severity_id\": \"-1\",\n" + + " \"class_name\": \"Network Activity\",\n" + + " \"timestamp\": " + System.currentTimeMillis() + "\n" + + "}"; + } + + private String adLdapMappings() { + return "\"properties\": {\n" + + " \"ResultType\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"ResultDescription\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"winlog.event_data.TargetUserName\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"timestamp\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " }"; + } + + private String sampleAdLdapEvent() { + return "{\n" + + " \"ResultType\": 50126,\n" + + " \"ResultDescription\": \"Invalid username or password or Invalid on-premises username or password.\",\n" + + " \"winlog.event_data.TargetUserName\": \"DEYSUBHO\",\n" + + " \"timestamp\": " + System.currentTimeMillis() + "\n" + + "}"; + } + + private String windowsMappings() { + return " \"properties\": {\n" + + " \"server.user.hash\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"winlog.event_id\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"host.hostname\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"windows.message\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"winlog.provider_name\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"winlog.event_data.ServiceName\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"winlog.timestamp\": {\n" + + " \"type\": \"long\"\n" + + " }\n" + + " }\n"; + } + + private String sampleWindowsEvent() { + return "{\n" + + " \"EventTime\": \"2020-02-04T14:59:39.343541+00:00\",\n" + + " \"host.hostname\": \"EC2AMAZ-EPO7HKA\",\n" + + " \"Keywords\": \"9223372036854775808\",\n" + + " \"SeverityValue\": 2,\n" + + " \"Severity\": \"INFO\",\n" + + " \"winlog.event_id\": 22,\n" + + " \"SourceName\": \"Microsoft-Windows-Sysmon\",\n" + + " \"ProviderGuid\": \"{5770385F-C22A-43E0-BF4C-06F5698FFBD9}\",\n" + + " \"Version\": 5,\n" + + " \"TaskValue\": 22,\n" + + " \"OpcodeValue\": 0,\n" + + " \"RecordNumber\": 9532,\n" + + " \"ExecutionProcessID\": 1996,\n" + + " \"ExecutionThreadID\": 2616,\n" + + " \"Channel\": \"Microsoft-Windows-Sysmon/Operational\",\n" + + " \"winlog.event_data.SubjectDomainName\": \"NTAUTHORITY\",\n" + + " \"AccountName\": \"SYSTEM\",\n" + + " \"UserID\": \"S-1-5-18\",\n" + + " \"AccountType\": \"User\",\n" + + " \"windows.message\": \"Dns query:\\r\\nRuleName: \\r\\nUtcTime: 2020-02-04 14:59:38.349\\r\\nProcessGuid: {b3c285a4-3cda-5dc0-0000-001077270b00}\\r\\nProcessId: 1904\\r\\nQueryName: EC2AMAZ-EPO7HKA\\r\\nQueryStatus: 0\\r\\nQueryResults: 172.31.46.38;\\r\\nImage: C:\\\\Program Files\\\\nxlog\\\\nxlog.exe\",\n" + + " \"Category\": \"Dns query (rule: DnsQuery)\",\n" + + " \"Opcode\": \"Info\",\n" + + " \"UtcTime\": \"2020-02-04 14:59:38.349\",\n" + + " \"ProcessGuid\": \"{b3c285a4-3cda-5dc0-0000-001077270b00}\",\n" + + " \"ProcessId\": \"1904\",\n" + + " \"QueryName\": \"EC2AMAZ-EPO7HKA\",\n" + + " \"QueryStatus\": \"0\",\n" + + " \"QueryResults\": \"172.31.46.38;\",\n" + + " \"Image\": \"C:\\\\Program Files\\\\nxlog\\\\regsvr32.exe\",\n" + + " \"EventReceivedTime\": \"2020-02-04T14:59:40.780905+00:00\",\n" + + " \"SourceModuleName\": \"in\",\n" + + " \"SourceModuleType\": \"im_msvistalog\",\n" + + " \"CommandLine\": \"eachtest\",\n" + + " \"Initiated\": \"true\",\n" + + " \"winlog.timestamp\": " + System.currentTimeMillis() + "\n" + + "}"; + } + + private String appLogsMappings() { + return " \"properties\": {\n" + + " \"http_method\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"endpoint\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"keywords\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"timestamp\": {\n" + + " \"type\": \"long\"\n" + + " }\n" + + " }\n"; + } + + private String sampleAppLogsEvent() { + return "{\n" + + " \"endpoint\": \"/customer_records.txt\",\n" + + " \"http_method\": \"POST\",\n" + + " \"keywords\": \"PermissionDenied\",\n" + + " \"timestamp\": " + System.currentTimeMillis() + "\n" + + "}"; + } + + private String s3AccessLogsMapping() { + return "\"properties\": {\n" + + " \"aws.cloudtrail.eventSource\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"aws.cloudtrail.eventName\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"aws.cloudtrail.eventTime\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"timestamp\": {\n" + + " \"type\": \"long\"\n" + + " }\n" + + " }"; + } + + private String sampleS3AccessEvent() { + return "{\n" + + " \"aws.cloudtrail.eventSource\": \"s3.amazonaws.com\",\n" + + " \"aws.cloudtrail.eventName\": \"ReplicateObject\",\n" + + " \"aws.cloudtrail.eventTime\": 1,\n" + + " \"timestamp\": " + System.currentTimeMillis() + "\n" + + "}"; + } } diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java index 443a794bd99df..4b5ee21dbb037 100644 --- a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/EventsCorrelationPlugin.java @@ -23,10 +23,17 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugin.correlation.events.action.IndexCorrelationAction; +import org.opensearch.plugin.correlation.events.action.SearchCorrelatedEventsAction; +import org.opensearch.plugin.correlation.events.action.StoreCorrelationAction; +import org.opensearch.plugin.correlation.events.transport.TransportIndexCorrelationAction; +import org.opensearch.plugin.correlation.events.transport.TransportSearchCorrelatedEventsAction; +import org.opensearch.plugin.correlation.events.transport.TransportStoreCorrelationAction; import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction; import org.opensearch.plugin.correlation.rules.resthandler.RestIndexCorrelationRuleAction; import org.opensearch.plugin.correlation.rules.transport.TransportIndexCorrelationRuleAction; import org.opensearch.plugin.correlation.settings.EventsCorrelationSettings; +import org.opensearch.plugin.correlation.utils.CorrelationIndices; import org.opensearch.plugin.correlation.utils.CorrelationRuleIndices; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; @@ -57,6 +64,8 @@ public class EventsCorrelationPlugin extends Plugin implements ActionPlugin { private CorrelationRuleIndices correlationRuleIndices; + private CorrelationIndices correlationIndices; + /** * Default constructor */ @@ -77,7 +86,8 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { correlationRuleIndices = new CorrelationRuleIndices(client, clusterService); - return List.of(correlationRuleIndices); + correlationIndices = new CorrelationIndices(client, clusterService, clusterService.getSettings()); + return List.of(correlationRuleIndices, correlationIndices); } @Override @@ -95,11 +105,18 @@ public List getRestHandlers( @Override public List> getActions() { - return List.of(new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class)); + return List.of(new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class), + new ActionPlugin.ActionHandler<>(IndexCorrelationAction.INSTANCE, TransportIndexCorrelationAction.class), + new ActionPlugin.ActionHandler<>(StoreCorrelationAction.INSTANCE, TransportStoreCorrelationAction.class), + new ActionPlugin.ActionHandler<>(SearchCorrelatedEventsAction.INSTANCE, TransportSearchCorrelatedEventsAction.class)); } @Override public List> getSettings() { - return List.of(EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING, EventsCorrelationSettings.CORRELATION_TIME_WINDOW); + return List.of( + EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING, + EventsCorrelationSettings.CORRELATION_HISTORY_INDEX_SHARDS, + EventsCorrelationSettings.CORRELATION_TIME_WINDOW + ); } } diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationAction.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationAction.java new file mode 100644 index 0000000000000..5ebdb6d403ecf --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationAction.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionType; + +public class IndexCorrelationAction extends ActionType { + + public static final IndexCorrelationAction INSTANCE = new IndexCorrelationAction(); + public static final String NAME = "cluster:admin/index/correlation/events"; + + private IndexCorrelationAction() { + super(NAME, IndexCorrelationResponse::new); + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationRequest.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationRequest.java new file mode 100644 index 0000000000000..0fe0aab809f31 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationRequest.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class IndexCorrelationRequest extends ActionRequest { + + private String index; + + private String event; + + private Boolean store; + + public IndexCorrelationRequest(String index, String event, Boolean store) { + super(); + this.index = index; + this.event = event; + this.store = store; + } + + public IndexCorrelationRequest(StreamInput sin) throws IOException { + this(sin.readString(), sin.readString(), sin.readBoolean()); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(event); + out.writeBoolean(store); + } + + public String getIndex() { + return index; + } + + public String getEvent() { + return event; + } + + public Boolean getStore() { + return store; + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationResponse.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationResponse.java new file mode 100644 index 0000000000000..cddb79d5a54c1 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/IndexCorrelationResponse.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class IndexCorrelationResponse extends ActionResponse { + + private Boolean isOrphan; + + private Map> neighborEvents; + + private RestStatus status; + + public IndexCorrelationResponse(Boolean isOrphan, Map> neighborEvents, RestStatus status) { + super(); + this.isOrphan = isOrphan; + this.neighborEvents = neighborEvents; + this.status = status; + } + + public IndexCorrelationResponse(StreamInput sin) throws IOException { + this( + sin.readBoolean(), + sin.readMap(StreamInput::readString, StreamInput::readStringList), + sin.readEnum(RestStatus.class) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(isOrphan); + out.writeMap(neighborEvents, StreamOutput::writeString, StreamOutput::writeStringCollection); + out.writeEnum(status); + } + + public RestStatus getStatus() { + return status; + } + + public Boolean getOrphan() { + return isOrphan; + } + + public Map> getNeighborEvents() { + return neighborEvents; + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsAction.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsAction.java new file mode 100644 index 0000000000000..7d6884118c41c --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsAction.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionType; + +public class SearchCorrelatedEventsAction extends ActionType { + + public static final SearchCorrelatedEventsAction INSTANCE = new SearchCorrelatedEventsAction(); + public static final String NAME = "cluster:admin/search/correlation/events"; + + private SearchCorrelatedEventsAction() { + super(NAME, SearchCorrelatedEventsResponse::new); + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsRequest.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsRequest.java new file mode 100644 index 0000000000000..e9d46c0a3203d --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsRequest.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class SearchCorrelatedEventsRequest extends ActionRequest { + + private String index; + + private String event; + + private String timestampField; + + private Long timeWindow; + + private Integer nearbyEvents; + + public SearchCorrelatedEventsRequest(String index, String event, String timestampField, Long timeWindow, Integer nearbyEvents) { + super(); + this.index = index; + this.event = event; + this.timestampField = timestampField; + this.timeWindow = timeWindow; + this.nearbyEvents = nearbyEvents; + } + + public SearchCorrelatedEventsRequest(StreamInput sin) throws IOException { + this( + sin.readString(), + sin.readString(), + sin.readString(), + sin.readLong(), + sin.readInt() + ); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(event); + out.writeString(timestampField); + out.writeLong(timeWindow); + out.writeInt(nearbyEvents); + } + + public String getIndex() { + return index; + } + + public String getEvent() { + return event; + } + + public String getTimestampField() { + return timestampField; + } + + public Long getTimeWindow() { + return timeWindow; + } + + public Integer getNearbyEvents() { + return nearbyEvents; + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsResponse.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsResponse.java new file mode 100644 index 0000000000000..d81d518346d0f --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/SearchCorrelatedEventsResponse.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.correlation.events.model.EventWithScore; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class SearchCorrelatedEventsResponse extends ActionResponse implements ToXContentObject { + + private List events; + + private RestStatus status; + + private static final String EVENTS = "events"; + + public SearchCorrelatedEventsResponse(List events, RestStatus status) { + super(); + this.events = events; + this.status = status; + } + + public SearchCorrelatedEventsResponse(StreamInput sin) throws IOException { + this( + Collections.unmodifiableList(sin.readList(EventWithScore::new)), + sin.readEnum(RestStatus.class) + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject() + .field(EVENTS, events) + .endObject(); + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(events); + out.writeEnum(status); + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationAction.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationAction.java new file mode 100644 index 0000000000000..2aa56b39b5fdd --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationAction.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionType; + +public class StoreCorrelationAction extends ActionType { + + public static final StoreCorrelationAction INSTANCE = new StoreCorrelationAction(); + public static final String NAME = "cluster:admin/store/correlation/events"; + + private StoreCorrelationAction() { + super(NAME, StoreCorrelationResponse::new); + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationRequest.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationRequest.java new file mode 100644 index 0000000000000..156eef15ab9c6 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationRequest.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class StoreCorrelationRequest extends ActionRequest { + + private String index; + + private String event; + + private Long timestamp; + + private Map> eventsAdjacencyList; + + private List tags; + + public StoreCorrelationRequest(String index, String event, Long timestamp, Map> eventsAdjacencyList, List tags) { + super(); + this.index = index; + this.event = event; + this.timestamp = timestamp; + this.eventsAdjacencyList = eventsAdjacencyList; + this.tags = tags; + } + + public StoreCorrelationRequest(StreamInput sin) throws IOException { + this( + sin.readString(), + sin.readString(), + sin.readLong(), + sin.readMap(StreamInput::readString, StreamInput::readStringList), + sin.readStringList() + ); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(event); + out.writeLong(timestamp); + out.writeMap(eventsAdjacencyList, StreamOutput::writeString, StreamOutput::writeStringCollection); + out.writeStringCollection(tags); + } + + public String getEvent() { + return event; + } + + public String getIndex() { + return index; + } + + public Long getTimestamp() { + return timestamp; + } + + public List getTags() { + return tags; + } + + public Map> getEventsAdjacencyList() { + return eventsAdjacencyList; + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationResponse.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationResponse.java new file mode 100644 index 0000000000000..f832bbc743f28 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/action/StoreCorrelationResponse.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.action; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; + +public class StoreCorrelationResponse extends ActionResponse { + + private RestStatus status; + + public StoreCorrelationResponse(RestStatus status) { + super(); + this.status = status; + } + + public StoreCorrelationResponse(StreamInput sin) throws IOException { + this( + sin.readEnum(RestStatus.class) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeEnum(status); + } + + public RestStatus getStatus() { + return status; + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/model/Correlation.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/model/Correlation.java new file mode 100644 index 0000000000000..3d86ab4e3a22f --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/model/Correlation.java @@ -0,0 +1,261 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.model; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.XContentParserUtils; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class Correlation implements Writeable, ToXContentObject { + + private static final Logger log = LogManager.getLogger(Correlation.class); + + public static final String CORRELATION_HISTORY_INDEX = ".opensearch-correlation-history"; + + private static final String ROOT_FIELD = "root"; + private static final String LEVEL_FIELD = "level"; + private static final String EVENT1_FIELD = "event1"; + private static final String EVENT2_FIELD = "event2"; + private static final String CORRELATION_VECTOR_FIELD = "corr_vector"; + private static final String TIMESTAMP_FIELD = "timestamp"; + private static final String INDEX1_FIELD = "index1"; + private static final String INDEX2_FIELD = "index2"; + private static final String TAGS_FIELD = "tags"; + private static final String SCORE_TIMESTAMP_FIELD = "score_timestamp"; + + private String id; + + private Long version; + + private Boolean isRoot; + + private Long level; + + private String event1; + + private String event2; + + private float[] correlationVector; + + private Long timestamp; + + private String index1; + + private String index2; + + private List tags; + + private Long scoreTimestamp; + + public Correlation(String id, + Long version, + Boolean isRoot, + Long level, + String event1, + String event2, + float[] correlationVector, + Long timestamp, + String index1, + String index2, + List tags, + Long scoreTimestamp) { + this.id = id; + this.version = version; + this.isRoot = isRoot; + this.level = level; + this.event1 = event1; + this.event2 = event2; + this.correlationVector = correlationVector; + this.timestamp = timestamp; + this.index1 = index1; + this.index2 = index2; + this.tags = tags; + this.scoreTimestamp = scoreTimestamp; + } + + public Correlation(Boolean isRoot, + Long level, + String event1, + String event2, + float[] correlationVector, + Long timestamp, + String index1, + String index2, + List tags, + Long scoreTimestamp) { + this("", + 1L, + isRoot, + level, + event1, + event2, + correlationVector, + timestamp, + index1, + index2, + tags, + scoreTimestamp); + } + + public Correlation(StreamInput sin) throws IOException { + this(sin.readString(), + sin.readLong(), + sin.readBoolean(), + sin.readLong(), + sin.readString(), + sin.readString(), + sin.readFloatArray(), + sin.readLong(), + sin.readString(), + sin.readString(), + sin.readStringList(), + sin.readLong()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ROOT_FIELD, isRoot); + builder.field(LEVEL_FIELD, level); + builder.field(EVENT1_FIELD, event1); + builder.field(EVENT2_FIELD, event2); + builder.field(CORRELATION_VECTOR_FIELD, correlationVector); + builder.field(TIMESTAMP_FIELD, timestamp); + builder.field(INDEX1_FIELD, index1); + builder.field(INDEX2_FIELD, index2); + builder.field(TAGS_FIELD, tags); + builder.field(SCORE_TIMESTAMP_FIELD, scoreTimestamp); + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeLong(version); + out.writeBoolean(isRoot); + out.writeLong(level); + out.writeString(event1); + out.writeString(event2); + out.writeFloatArray(correlationVector); + out.writeLong(timestamp); + out.writeString(index1); + out.writeString(index2); + out.writeStringCollection(tags); + out.writeLong(scoreTimestamp); + } + + public static Correlation parse(XContentParser xcp, String id, Long version) throws IOException { + Boolean isRoot = null; + Long level = null; + String event1 = null; + String event2 = null; + float[] correlationVector = null; + Long timestamp = null; + String index1 = null; + String index2 = null; + List tags = new ArrayList<>(); + Long scoreTimestamp = null; + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = xcp.currentName(); + xcp.nextToken(); + + switch (fieldName) { + case ROOT_FIELD: + isRoot = xcp.booleanValue(); + break; + case LEVEL_FIELD: + level = xcp.longValue(); + break; + case EVENT1_FIELD: + event1 = xcp.text(); + break; + case EVENT2_FIELD: + event2 = xcp.text(); + break; + case CORRELATION_VECTOR_FIELD: + List correlationVectorList = new ArrayList<>(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + correlationVectorList.add(xcp.floatValue()); + } + + correlationVector = new float[correlationVectorList.size()]; + for (int idx = 0; idx < correlationVectorList.size(); ++idx) { + correlationVector[idx] = correlationVectorList.get(idx); + } + break; + case TIMESTAMP_FIELD: + timestamp = xcp.longValue(); + break; + case INDEX1_FIELD: + index1 = xcp.text(); + break; + case INDEX2_FIELD: + index2 = xcp.text(); + break; + case TAGS_FIELD: + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + tags.add(xcp.text()); + } + break; + case SCORE_TIMESTAMP_FIELD: + scoreTimestamp = xcp.longValue(); + break; + default: + xcp.skipChildren(); + } + } + + return new Correlation( + isRoot, + level, + event1, + event2, + correlationVector, + timestamp, + index1, + index2, + tags, + scoreTimestamp); + } + + public static Correlation readFrom(StreamInput sin) throws IOException { + return new Correlation(sin); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Correlation that = (Correlation) o; + return id.equals(that.id) && version.equals(that.version) && isRoot.equals(that.isRoot) && level.equals(that.level) && event1.equals(that.event1) && event2.equals(that.event2) && Arrays.equals(correlationVector, that.correlationVector) && timestamp.equals(that.timestamp) && index1.equals(that.index1) && index2.equals(that.index2) && tags.equals(that.tags) && scoreTimestamp.equals(that.scoreTimestamp); + } + + @Override + public int hashCode() { + int result = Objects.hash(id, version, isRoot, level, event1, event2, timestamp, index1, index2, tags, scoreTimestamp); + result = 31 * result + Arrays.hashCode(correlationVector); + return result; + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/model/EventWithScore.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/model/EventWithScore.java new file mode 100644 index 0000000000000..d8e7e1b66e9ab --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/model/EventWithScore.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.model; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.XContentParserUtils; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class EventWithScore implements Writeable, ToXContentObject { + + private static final String INDEX_FIELD = "index"; + private static final String EVENT_FIELD = "event"; + private static final String SCORE_FIELD = "score"; + private static final String TAGS_FIELD = "tags"; + + private String index; + + private String event; + + private Double score; + + private List tags; + + public EventWithScore(String index, String event, Double score, List tags) { + this.index = index; + this.event = event; + this.score = score; + this.tags = tags; + } + + public EventWithScore(StreamInput sin) throws IOException { + this( + sin.readString(), + sin.readString(), + sin.readDouble(), + sin.readStringList() + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(event); + out.writeDouble(score); + out.writeStringCollection(tags); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject() + .field(INDEX_FIELD, index) + .field(EVENT_FIELD, event) + .field(SCORE_FIELD, score) + .field(TAGS_FIELD, tags); + return builder.endObject(); + } + + public static EventWithScore parse(XContentParser xcp) throws IOException { + String index = null; + String event = null; + Double score = null; + List tags = new ArrayList<>(); + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = xcp.currentName(); + xcp.nextToken(); + + switch (fieldName) { + case INDEX_FIELD: + index = xcp.text(); + break; + case EVENT_FIELD: + event = xcp.text(); + break; + case SCORE_FIELD: + score = xcp.doubleValue(); + break; + case TAGS_FIELD: + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + String tag = xcp.text(); + tags.add(tag); + } + break; + default: + xcp.skipChildren(); + } + } + return new EventWithScore(index, event, score, tags); + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/resthandler/RestSearchCorrelatedEventsAction.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/resthandler/RestSearchCorrelatedEventsAction.java new file mode 100644 index 0000000000000..f347a0138c078 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/resthandler/RestSearchCorrelatedEventsAction.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.resthandler; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; + +import java.io.IOException; + +public class RestSearchCorrelatedEventsAction extends BaseRestHandler { + + @Override + public String getName() { + return null; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + return null; + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportIndexCorrelationAction.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportIndexCorrelationAction.java new file mode 100644 index 0000000000000..46184e19f1fa6 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportIndexCorrelationAction.java @@ -0,0 +1,460 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.join.ScoreMode; +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.NestedQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.correlation.events.action.IndexCorrelationAction; +import org.opensearch.plugin.correlation.events.action.IndexCorrelationRequest; +import org.opensearch.plugin.correlation.events.action.IndexCorrelationResponse; +import org.opensearch.plugin.correlation.events.action.StoreCorrelationAction; +import org.opensearch.plugin.correlation.events.action.StoreCorrelationRequest; +import org.opensearch.plugin.correlation.events.action.StoreCorrelationResponse; +import org.opensearch.plugin.correlation.rules.model.CorrelationQuery; +import org.opensearch.plugin.correlation.rules.model.CorrelationRule; +import org.opensearch.plugin.correlation.settings.EventsCorrelationSettings; +import org.opensearch.rest.RestStatus; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class TransportIndexCorrelationAction extends HandledTransportAction { + + private static final Logger log = LogManager.getLogger(TransportIndexCorrelationAction.class); + + private final Client client; + + private final NamedXContentRegistry xContentRegistry; + + private final Settings settings; + + private final ClusterService clusterService; + + private volatile long correlationTimeWindow; + + @Inject + public TransportIndexCorrelationAction( + TransportService transportService, + Client client, + NamedXContentRegistry xContentRegistry, + Settings settings, + ActionFilters actionFilters, + ClusterService clusterService) { + super(IndexCorrelationAction.NAME, transportService, actionFilters, IndexCorrelationRequest::new); + this.client = client; + this.xContentRegistry = xContentRegistry; + this.settings = settings; + this.clusterService = clusterService; + this.correlationTimeWindow = EventsCorrelationSettings.CORRELATION_TIME_WINDOW.get(this.settings).getMillis(); + + this.clusterService.getClusterSettings().addSettingsUpdateConsumer(EventsCorrelationSettings.CORRELATION_TIME_WINDOW, it -> correlationTimeWindow = it.getMillis()); + } + + @Override + protected void doExecute(Task task, IndexCorrelationRequest request, ActionListener listener) { + AsyncIndexCorrelationAction asyncAction = new AsyncIndexCorrelationAction(request, listener); + asyncAction.start(); + } + + class AsyncIndexCorrelationAction { + private final IndexCorrelationRequest request; + + private final ActionListener listener; + + AsyncIndexCorrelationAction(IndexCorrelationRequest request, ActionListener listener) { + this.request = request; + this.listener = listener; + } + + void start() { + String inputIndex = request.getIndex(); + String event = request.getEvent(); + + NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery( + "correlate", + QueryBuilders.matchQuery("correlate.index", inputIndex), + ScoreMode.None + ); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + + Iterator hits = response.getHits().iterator(); + List correlationRules = new ArrayList<>(); + while (hits.hasNext()) { + try { + SearchHit hit = hits.next(); + + XContentParser xcp = XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString() + ); + + CorrelationRule rule = CorrelationRule.parse(xcp, hit.getId(), hit.getVersion()); + correlationRules.add(rule); + } catch (IOException e) { + onFailures(e); + } + } + + prepRulesForCorrelatedEventsGeneration(inputIndex, event, correlationRules); + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + private void prepRulesForCorrelatedEventsGeneration(String index, String event, List correlationRules) { + MultiSearchRequest mSearchRequest = new MultiSearchRequest(); + SearchRequest eventSearchRequest = null; + + for (CorrelationRule rule: correlationRules) { + // assuming no index duplication in a rule. + Optional query = rule.getCorrelationQueries().stream() + .filter(correlationQuery -> correlationQuery.getIndex().equals(index)).findFirst(); + + if (query.isPresent()) { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.matchQuery("_id", event)) + .must(QueryBuilders.queryStringQuery(query.get().getQuery())); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(false); + + // assuming all queries belonging to an index use the same timestamp field. + searchSourceBuilder.fetchField(query.get().getTimestampField()); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(index); + searchRequest.source(searchSourceBuilder); + mSearchRequest.add(searchRequest); + + if (eventSearchRequest == null) { + SearchSourceBuilder eventSearchSourceBuilder = new SearchSourceBuilder(); + eventSearchSourceBuilder.query(QueryBuilders.matchQuery("_id", event)); + eventSearchSourceBuilder.fetchSource(false); + + // assuming all queries belonging to an index use the same timestamp field. + eventSearchSourceBuilder.fetchField(query.get().getTimestampField()); + + eventSearchRequest = new SearchRequest(); + eventSearchRequest.indices(index); + eventSearchRequest.source(eventSearchSourceBuilder); + } + } + } + + if (!mSearchRequest.requests().isEmpty()) { + SearchRequest finalEventSearchRequest = eventSearchRequest; + client.multiSearch(mSearchRequest, new ActionListener<>() { + @Override + public void onResponse(MultiSearchResponse items) { + MultiSearchResponse.Item[] responses = items.getResponses(); + Map> indexQueriesMap = new HashMap<>(); + Long timestamp = null; + + int idx = 0; + for (MultiSearchResponse.Item response: responses) { + if (response.isFailure()) { + // suppress exception + continue; + } + + SearchHits searchHits = response.getResponse().getHits(); + if (searchHits.getTotalHits().value == 1) { + for (CorrelationQuery query: correlationRules.get(idx).getCorrelationQueries()) { + List queries; + if (indexQueriesMap.containsKey(query.getIndex())) { + queries = indexQueriesMap.get(query.getIndex()); + } else { + queries = new ArrayList<>(); + } + queries.add(query); + indexQueriesMap.put(query.getIndex(), queries); + + if (query.getIndex().equals(index)) { + // assuming all queries belonging to an index use the same timestamp field. + timestamp = searchHits.getAt(0).getFields().get(query.getTimestampField()).getValue(); + } + } + } + ++idx; + } + + if (timestamp == null) { + client.search(finalEventSearchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + SearchHits searchHits = response.getHits(); + if (searchHits.getTotalHits().value == 1) { + Optional timestampField = searchHits.getAt(0).getFields().keySet().stream().findFirst(); + timestampField.ifPresent(s -> generateCorrelatedEvents(index, event, + searchHits.getAt(0).getFields().get(s).getValue(), indexQueriesMap)); + } else { + onFailures(new OpenSearchStatusException("failed at generate correlated events", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + generateCorrelatedEvents(index, event, timestamp, indexQueriesMap); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + // orphan event + if (request.getStore()) { + StoreCorrelationRequest storeCorrelationRequest = new StoreCorrelationRequest( + index, + event, + // as there are no rules there is no timestamp field, assuming event is inserted now. + System.currentTimeMillis(), + Map.of(), + List.of() + ); + client.execute(StoreCorrelationAction.INSTANCE, storeCorrelationRequest, new ActionListener<>() { + @Override + public void onResponse(StoreCorrelationResponse response) { + if (response.getStatus().equals(RestStatus.OK)) { + onOperation(true, new HashMap<>()); + } else { + onFailures(new OpenSearchStatusException("Failed to store correlations", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + onOperation(true, new HashMap<>()); + } + } + } + + private void generateCorrelatedEvents(String inputIndex, String event, Long timestamp, Map> indexQueriesMap) { + MultiSearchRequest mSearchRequest = new MultiSearchRequest(); + + for (Map.Entry> indexQueriesEntry: indexQueriesMap.entrySet()) { + String index = indexQueriesEntry.getKey(); + List correlationQueries = indexQueriesEntry.getValue(); + + // assuming all queries belonging to an index use the same timestamp field. + String timestampField = correlationQueries.get(0).getTimestampField(); + + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .filter(QueryBuilders.rangeQuery(timestampField) + .gte(timestamp - correlationTimeWindow) + .lte(timestamp + correlationTimeWindow)); + + if (index.equals(inputIndex)) { + queryBuilder = queryBuilder.mustNot(QueryBuilders.matchQuery("_id", event)); + } + + for (CorrelationQuery query: correlationQueries) { + queryBuilder = queryBuilder.should(QueryBuilders.queryStringQuery(query.getQuery())); + } + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(false); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(index); + searchRequest.source(searchSourceBuilder); + + mSearchRequest.add(searchRequest); + } + + if (!mSearchRequest.requests().isEmpty()) { + client.multiSearch(mSearchRequest, new ActionListener<>() { + @Override + public void onResponse(MultiSearchResponse items) { + MultiSearchResponse.Item[] responses = items.getResponses(); + Map> eventsAdjacencyList = new HashMap<>(); + + for (MultiSearchResponse.Item response: responses) { + if (response.isFailure()) { + // suppress exception + continue; + } + + Iterator searchHits = response.getResponse().getHits().iterator(); + + while (searchHits.hasNext()) { + SearchHit hit = searchHits.next(); + + String index = hit.getIndex(); + String id = hit.getId(); + + Set neighborEvents; + if (eventsAdjacencyList.containsKey(index)) { + neighborEvents = eventsAdjacencyList.get(index); + } else { + neighborEvents = new HashSet<>(); + } + neighborEvents.add(id); + eventsAdjacencyList.put(index, neighborEvents); + } + } + + Map> neighborEvents = new HashMap<>(); + for (Map.Entry> neighborEvent: eventsAdjacencyList.entrySet()) { + neighborEvents.put(neighborEvent.getKey(), new ArrayList<>(neighborEvent.getValue())); + } + + if (request.getStore()) { + StoreCorrelationRequest storeCorrelationRequest = new StoreCorrelationRequest( + inputIndex, + event, + timestamp, + neighborEvents, + List.of() + ); + client.execute(StoreCorrelationAction.INSTANCE, storeCorrelationRequest, new ActionListener<>() { + @Override + public void onResponse(StoreCorrelationResponse response) { + if (response.getStatus().equals(RestStatus.OK)) { + if (neighborEvents.isEmpty()) { + onOperation(true, neighborEvents); + } else { + onOperation(false, neighborEvents); + } + } else { + onFailures(new OpenSearchStatusException("Failed to store correlations", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + if (neighborEvents.isEmpty()) { + onOperation(true, neighborEvents); + } else { + onOperation(false, neighborEvents); + } + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + // orphan event + if (request.getStore()) { + StoreCorrelationRequest storeCorrelationRequest = new StoreCorrelationRequest( + inputIndex, + event, + timestamp, + Map.of(), + List.of() + ); + client.execute(StoreCorrelationAction.INSTANCE, storeCorrelationRequest, new ActionListener<>() { + @Override + public void onResponse(StoreCorrelationResponse response) { + if (response.getStatus().equals(RestStatus.OK)) { + onOperation(true, new HashMap<>()); + } else { + onFailures(new OpenSearchStatusException("Failed to store correlations", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + onOperation(true, new HashMap<>()); + } + } + } + + private void onOperation(Boolean isOrphan, Map> neighborEvents) { + finishHim(isOrphan, neighborEvents, null); + } + + private void onFailures(Exception t) { + finishHim(null, null, t); + } + + private void finishHim(Boolean isOrphan, Map> neighborEvents, Exception t) { + if (t != null) { + listener.onFailure(t); + } else { + listener.onResponse(new IndexCorrelationResponse(isOrphan, neighborEvents, RestStatus.OK)); + } + } + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportSearchCorrelatedEventsAction.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportSearchCorrelatedEventsAction.java new file mode 100644 index 0000000000000..a049bfa3c66c3 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportSearchCorrelatedEventsAction.java @@ -0,0 +1,262 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.transport; + +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.common.inject.Inject; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.correlation.core.index.query.CorrelationQueryBuilder; +import org.opensearch.plugin.correlation.events.action.IndexCorrelationResponse; +import org.opensearch.plugin.correlation.events.action.SearchCorrelatedEventsAction; +import org.opensearch.plugin.correlation.events.action.SearchCorrelatedEventsRequest; +import org.opensearch.plugin.correlation.events.action.SearchCorrelatedEventsResponse; +import org.opensearch.plugin.correlation.events.model.Correlation; +import org.opensearch.plugin.correlation.events.model.EventWithScore; +import org.opensearch.rest.RestStatus; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TransportSearchCorrelatedEventsAction extends HandledTransportAction< + SearchCorrelatedEventsRequest, + SearchCorrelatedEventsResponse> { + + private final Client client; + + @Inject + public TransportSearchCorrelatedEventsAction(TransportService transportService, Client client, ActionFilters actionFilters) { + super(SearchCorrelatedEventsAction.NAME, transportService, actionFilters, SearchCorrelatedEventsRequest::new); + this.client = client; + } + + @Override + protected void doExecute(Task task, SearchCorrelatedEventsRequest request, ActionListener listener) { + AsyncSearchCorrelatedEventsAction asyncAction = new AsyncSearchCorrelatedEventsAction(request, listener); + asyncAction.start(); + } + + class AsyncSearchCorrelatedEventsAction { + + private SearchCorrelatedEventsRequest request; + private ActionListener listener; + + AsyncSearchCorrelatedEventsAction(SearchCorrelatedEventsRequest request, ActionListener listener) { + this.request = request; + this.listener = listener; + } + + void start() { + String index = request.getIndex(); + String event = request.getEvent(); + String timestampField = request.getTimestampField(); + Long timeWindow = request.getTimeWindow(); + Integer nearbyEvents = request.getNearbyEvents(); + + MatchQueryBuilder queryBuilder = QueryBuilders.matchQuery( + "_id", event + ); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(false); + searchSourceBuilder.fetchField(timestampField); + searchSourceBuilder.size(1); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(index); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + if (response.getHits().getTotalHits().value != 1) { + onFailures(new OpenSearchStatusException("Event not found", RestStatus.INTERNAL_SERVER_ERROR)); + } + + SearchHit hit = response.getHits().getAt(0); + long eventTimestamp = hit.getFields().get(timestampField).getValue(); + + BoolQueryBuilder scoreQueryBuilder = QueryBuilders.boolQuery() + .mustNot(QueryBuilders.termQuery("score_timestamp", 0L)); + SearchSourceBuilder scoreSearchSourceBuilder = new SearchSourceBuilder(); + scoreSearchSourceBuilder.query(scoreQueryBuilder); + scoreSearchSourceBuilder.fetchSource(true); + scoreSearchSourceBuilder.size(1); + + SearchRequest scoreSearchRequest = new SearchRequest(); + scoreSearchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + scoreSearchRequest.source(scoreSearchSourceBuilder); + + client.search(scoreSearchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + if (response.getHits().getTotalHits().value != 1) { + onFailures(new OpenSearchStatusException("Score Root Record not found", RestStatus.INTERNAL_SERVER_ERROR)); + } + + Map source = response.getHits().getHits()[0].getSourceAsMap(); + assert source != null; + long scoreTimestamp = (long) source.get("score_timestamp"); + + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.matchQuery( + "event1", event + )).must(QueryBuilders.matchQuery( + "event2", "" + )); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(false); + searchSourceBuilder.fetchField("level"); + searchSourceBuilder.size(1); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + if (response.getHits().getTotalHits().value != 1) { + onFailures(new OpenSearchStatusException("Event not found in Correlation Index", RestStatus.INTERNAL_SERVER_ERROR)); + } + + SearchHit hit = response.getHits().getHits()[0]; + long level = hit.getFields().get("level").getValue(); + float[] query = new float[3]; + for (int i = 0; i < 2; ++i) { + query[i] = (2.0f * ((float) level) - 50.0f) / 2.0f; + } + query[2] = Long.valueOf((eventTimestamp - scoreTimestamp) / 1000L).floatValue(); + + CorrelationQueryBuilder correlationQueryBuilder = new CorrelationQueryBuilder("corr_vector", query, nearbyEvents, QueryBuilders.boolQuery() + .mustNot(QueryBuilders.matchQuery( + "event1", "" + )).mustNot(QueryBuilders.matchQuery( + "event2", "" + )).filter(QueryBuilders.rangeQuery("timestamp") + .gte(eventTimestamp - timeWindow) + .lte(eventTimestamp + timeWindow))); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(correlationQueryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(nearbyEvents); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + SearchHit[] hits = response.getHits().getHits(); + Map correlatedFindings = new HashMap<>(); + + for (SearchHit hit: hits) { + Map source = hit.getSourceAsMap(); + assert source != null; + if (!source.get("event1").toString().equals(event)) { + String eventKey1 = source.get("event1").toString() + " " + source.get("index1").toString(); + + if (correlatedFindings.containsKey(eventKey1)) { + correlatedFindings.put(eventKey1, Math.max(correlatedFindings.get(eventKey1), hit.getScore())); + } else { + correlatedFindings.put(eventKey1, (double) hit.getScore()); + } + } + if (!source.get("event2").toString().equals(event)) { + String eventKey2 = source.get("event2").toString() + " " + source.get("index2").toString(); + + if (correlatedFindings.containsKey(eventKey2)) { + correlatedFindings.put(eventKey2, Math.max(correlatedFindings.get(eventKey2), hit.getScore())); + } else { + correlatedFindings.put(eventKey2, (double) hit.getScore()); + } + } + } + + List eventWithScores = new ArrayList<>(); + for (Map.Entry correlatedFinding: correlatedFindings.entrySet()) { + String[] eventIndexSplit = correlatedFinding.getKey().split(" "); + eventWithScores.add(new EventWithScore(eventIndexSplit[1], eventIndexSplit[0], correlatedFinding.getValue(), List.of())); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + private void onOperation(List events) { + finishHim(events, null); + } + + private void onFailures(Exception t) { + finishHim(null, t); + } + + private void finishHim(List events, Exception t) { + if (t != null) { + listener.onFailure(t); + } else { + listener.onResponse(new SearchCorrelatedEventsResponse(events, RestStatus.OK)); + } + } + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportStoreCorrelationAction.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportStoreCorrelationAction.java new file mode 100644 index 0000000000000..a115b392a32a9 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/events/transport/TransportStoreCorrelationAction.java @@ -0,0 +1,707 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.events.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.correlation.core.index.query.CorrelationQueryBuilder; +import org.opensearch.plugin.correlation.events.action.StoreCorrelationAction; +import org.opensearch.plugin.correlation.events.action.StoreCorrelationRequest; +import org.opensearch.plugin.correlation.events.action.StoreCorrelationResponse; +import org.opensearch.plugin.correlation.events.model.Correlation; +import org.opensearch.plugin.correlation.settings.EventsCorrelationSettings; +import org.opensearch.plugin.correlation.utils.CorrelationIndices; +import org.opensearch.plugin.correlation.utils.IndexUtils; +import org.opensearch.rest.RestStatus; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class TransportStoreCorrelationAction extends HandledTransportAction { + + private static final Logger log = LogManager.getLogger(TransportStoreCorrelationAction.class); + + private final Client client; + + private final Settings settings; + + private final CorrelationIndices correlationIndices; + + private final ClusterService clusterService; + + private final long setupTimestamp; + + private volatile long correlationTimeWindow; + + @Inject + public TransportStoreCorrelationAction( + TransportService transportService, + Client client, + Settings settings, + ActionFilters actionFilters, + CorrelationIndices correlationIndices, + ClusterService clusterService + ) { + super(StoreCorrelationAction.NAME, transportService, actionFilters, StoreCorrelationRequest::new); + this.client = client; + this.settings = settings; + this.correlationIndices = correlationIndices; + this.clusterService = clusterService; + this.setupTimestamp = System.currentTimeMillis(); + this.correlationTimeWindow = EventsCorrelationSettings.CORRELATION_TIME_WINDOW.get(this.settings).getMillis(); + + this.clusterService.getClusterSettings().addSettingsUpdateConsumer(EventsCorrelationSettings.CORRELATION_TIME_WINDOW, it -> correlationTimeWindow = it.getMillis()); + } + + @Override + protected void doExecute(Task task, StoreCorrelationRequest request, ActionListener listener) { + AsyncStoreCorrelationAction asyncAction = new AsyncStoreCorrelationAction(request, listener); + + if (!this.correlationIndices.correlationIndexExists()) { + try { + this.correlationIndices.initCorrelationIndex(new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse response) { + if (response.isAcknowledged()) { + IndexUtils.correlationHistoryIndexUpdated(); + try { + correlationIndices.setupCorrelationIndex(setupTimestamp, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + if (!response.hasFailures()) { + asyncAction.start(); + } else { + asyncAction.onFailures(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + asyncAction.onFailures(e); + } + }); + } catch (IOException e) { + asyncAction.onFailures(e); + } + } + } + + @Override + public void onFailure(Exception e) { + asyncAction.onFailures(e); + } + }); + } catch (IOException e) { + asyncAction.onFailures(e); + } + } else { + asyncAction.start(); + } + } + + class AsyncStoreCorrelationAction { + private final StoreCorrelationRequest request; + + private final ActionListener listener; + + AsyncStoreCorrelationAction(StoreCorrelationRequest request, ActionListener listener) { + this.request = request; + this.listener = listener; + } + + void start() { + prepareCorrelationHistoryIndex(); + } + + private void prepareCorrelationHistoryIndex() { + try { + if (!IndexUtils.correlationHistoryIndexUpdated) { + IndexUtils.updateIndexMapping( + Correlation.CORRELATION_HISTORY_INDEX, + CorrelationIndices.correlationMappings(), clusterService.state(), client.admin().indices(), + new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + if (response.isAcknowledged()) { + IndexUtils.correlationHistoryIndexUpdated(); + generateTimestampFeature(); + } else { + onFailures(new OpenSearchStatusException("Failed to create correlation Index", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + } + ); + } else { + generateTimestampFeature(); + } + } catch (IOException ex) { + onFailures(ex); + } + } + + private void generateTimestampFeature() { + Long eventTimestamp = request.getTimestamp(); + + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .mustNot(QueryBuilders.termQuery("score_timestamp", 0L)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(1); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + if (response.getHits().getTotalHits().value != 1) { + onFailures(new OpenSearchStatusException("Score Root Record not found", RestStatus.INTERNAL_SERVER_ERROR)); + } + + SearchHit hit = response.getHits().getHits()[0]; + String id = hit.getId(); + Map source = hit.getSourceAsMap(); + + long scoreTimestamp = source != null? (long) source.get("score_timestamp"): 0L; + if (eventTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL > scoreTimestamp) { + try { + Correlation scoreRootRecord = new Correlation(id, + 1L, + false, + 0L, + "", + "", + new float[]{}, + 0L, + "", + "", + List.of(), + eventTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL); + + IndexRequest scoreIndexRequest = new IndexRequest(Correlation.CORRELATION_HISTORY_INDEX) + .id(id) + .source(scoreRootRecord.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(60)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + client.index(scoreIndexRequest, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + if (response.status().equals(RestStatus.OK)) { + if (request.getEventsAdjacencyList() == null || request.getEventsAdjacencyList().isEmpty()) { + insertOrphanEvents(Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue()); + } else { + insertCorrelatedEvents(Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue()); + } + } else { + onFailures(new OpenSearchStatusException("Failed to update Score Root record", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } catch (IOException ex) { + onFailures(ex); + } + } else { + float timestampFeature = Long.valueOf((eventTimestamp - scoreTimestamp) / 1000L).floatValue(); + if (request.getEventsAdjacencyList() == null || request.getEventsAdjacencyList().isEmpty()) { + insertOrphanEvents(timestampFeature); + } else { + insertCorrelatedEvents(timestampFeature); + } + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + private void insertCorrelatedEvents(float timestampFeature) { + long eventTimestamp = request.getTimestamp(); + Map> neighborEvents = request.getEventsAdjacencyList(); + + MatchQueryBuilder queryBuilder = QueryBuilders.matchQuery( + "root", true + ); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(1); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + if (response.getHits().getTotalHits().value != 1) { + onFailures(new OpenSearchStatusException("Root Record not found", RestStatus.INTERNAL_SERVER_ERROR)); + } + + SearchHit hit = response.getHits().getHits()[0]; + Map source = hit.getSourceAsMap(); + + assert source != null; + long level = Long.parseLong(source.get("level").toString()); + + MultiSearchRequest mSearchRequest = new MultiSearchRequest(); + for (Map.Entry> neighborEvent: neighborEvents.entrySet()) { + for (String event: neighborEvent.getValue()) { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.matchQuery( + "event1", event + )).must(QueryBuilders.matchQuery( + "event2", "" + )); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(1); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + mSearchRequest.add(searchRequest); + } + } + + client.multiSearch(mSearchRequest, new ActionListener<>() { + @Override + public void onResponse(MultiSearchResponse items) { + MultiSearchResponse.Item[] responses = items.getResponses(); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + long prevLevel = -1L; + long totalNeighbors = 0L; + for (MultiSearchResponse.Item response: responses) { + if (response.isFailure()) { + log.info(response.getFailureMessage()); + continue; + } + + if (response.getResponse().getHits().getTotalHits().value == 1) { + ++totalNeighbors; + + SearchHit hit = response.getResponse().getHits().getHits()[0]; + Map source = hit.getSourceAsMap(); + + assert source != null; + long neighborLevel = Long.parseLong(source.get("level").toString()); + String correlatedEvent = source.get("event1").toString(); + String correlatedIndex = source.get("index1").toString(); + + try { + float[] corrVector = new float[3]; + if (level != prevLevel) { + for (int i = 0; i < 2; ++i) { + corrVector[i] = ((float) level) - 50.0f; + } + corrVector[0] = (float) level; + corrVector[2] = timestampFeature; + + Correlation event = new Correlation( + false, + level, + request.getEvent(), + "", + corrVector, + eventTimestamp, + request.getIndex(), + "", + request.getTags(), + 0L + ); + + IndexRequest indexRequest = new IndexRequest(Correlation.CORRELATION_HISTORY_INDEX) + .source(event.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(60)); + bulkRequest.add(indexRequest); + } + + corrVector = new float[3]; + for (int i = 0; i < 2; ++i) { + corrVector[i] = ((float) level) - 50.0f; + } + corrVector[0] = (2.0f * ((float) level) - 50.0f) / 2.0f; + corrVector[1] = (2.0f * ((float) neighborLevel) - 50.0f) / 2.0f; + corrVector[2] = timestampFeature; + + Correlation event = new Correlation( + false, + (long) ((2.0f * ((float) level) - 50.0f) / 2.0f), + request.getEvent(), + correlatedEvent, + corrVector, + eventTimestamp, + request.getIndex(), + correlatedIndex, + request.getTags(), + 0L + ); + + IndexRequest indexRequest = new IndexRequest(Correlation.CORRELATION_HISTORY_INDEX) + .source(event.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(60)); + bulkRequest.add(indexRequest); + } catch (IOException ex) { + onFailures(ex); + } + prevLevel = level; + } + } + + if (totalNeighbors > 0L) { + client.bulk(bulkRequest, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + if (response.hasFailures()) { + onFailures(new OpenSearchStatusException("Correlation of finding failed", RestStatus.INTERNAL_SERVER_ERROR)); + } + onOperation(); + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + insertOrphanEvents(timestampFeature); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + private void insertOrphanEvents(float timestampFeature) { + long eventTimestamp = request.getTimestamp(); + + MatchQueryBuilder queryBuilder = QueryBuilders.matchQuery( + "root", true + ); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(1); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + if (response.getHits().getTotalHits().value != 1) { + onFailures(new OpenSearchStatusException("Root Record not found", RestStatus.INTERNAL_SERVER_ERROR)); + } + + SearchHit hit = response.getHits().getHits()[0]; + Map source = hit.getSourceAsMap(); + String id = hit.getId(); + + assert source != null; + long level = Long.parseLong(source.get("level").toString()); + long timestamp = Long.parseLong(source.get("timestamp").toString()); + + try { + if (level == 0L) { + updateRootRecord(id, 50L, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + if (response.status().equals(RestStatus.OK)) { + try { + float[] corrVector = new float[3]; + corrVector[0] = 50.0f; + corrVector[2] = timestampFeature; + + storeEvents(50L, corrVector); + } catch (IOException ex) { + onFailures(ex); + } + } else { + onFailures(new OpenSearchStatusException("Root Record not updated", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + if (eventTimestamp - timestamp > correlationTimeWindow) { + updateRootRecord(id, 50L, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + if (response.status().equals(RestStatus.OK)) { + try { + float[] corrVector = new float[3]; + corrVector[0] = 50.0f; + corrVector[2] = timestampFeature; + + storeEvents(50L, corrVector); + } catch (IOException ex) { + onFailures(ex); + } + } else { + onFailures(new OpenSearchStatusException("Root Record not updated", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } else { + float[] query = new float[3]; + for (int i = 0; i < 2; ++i) { + query[i] = (2.0f * ((float) level) - 50.0f) / 2.0f; + } + query[2] = timestampFeature; + + CorrelationQueryBuilder correlationQueryBuilder = new CorrelationQueryBuilder("corr_vector", query, 3, QueryBuilders.boolQuery() + .mustNot(QueryBuilders.matchQuery( + "event1", "" + )).mustNot(QueryBuilders.matchQuery( + "event2", "" + )).filter(QueryBuilders.rangeQuery("timestamp") + .gte(eventTimestamp - correlationTimeWindow) + .lte(eventTimestamp + correlationTimeWindow))); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(correlationQueryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(1); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(Correlation.CORRELATION_HISTORY_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); + } + + long totalHits = response.getHits().getTotalHits().value; + SearchHit hit = totalHits > 0? response.getHits().getHits()[0]: null; + long existLevel = 0L; + + if (hit != null) { + Map source = hit.getSourceAsMap(); + assert source != null; + existLevel = Long.parseLong(source.get("level").toString()); + } + + try { + if (totalHits == 0L || existLevel != ((long) (2.0f * ((float) level) - 50.0f) / 2.0f)) { + float[] corrVector = new float[3]; + for (int i = 0; i < 2; ++i) { + corrVector[i] = ((float) level) - 50.0f; + } + corrVector[0] = (float) level; + corrVector[2] = timestampFeature; + + storeEvents(level, corrVector); + } else { + updateRootRecord(id, level + 50L, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + if (response.status().equals(RestStatus.OK)) { + try { + float[] corrVector = new float[3]; + for (int i = 0; i < 2; ++i) { + corrVector[i] = (float) level; + } + corrVector[0] = level + 50.0f; + corrVector[2] = timestampFeature; + + storeEvents(level + 50L, corrVector); + } catch (IOException ex) { + onFailures(ex); + } + } + } + + @Override + public void onFailure(Exception e) { + + } + }); + } + } catch (IOException ex) { + onFailures(ex); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + } + } catch (IOException ex) { + onFailures(ex); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + private void updateRootRecord(String id, Long level, ActionListener listener) throws IOException { + Correlation rootRecord = new Correlation( + id, + 1L, + true, + level, + "", + "", + new float[]{0.0f, 0.0f, 0.0f}, + request.getTimestamp(), + "", + "", + List.of(), + 0L + ); + + IndexRequest indexRequest = new IndexRequest(Correlation.CORRELATION_HISTORY_INDEX) + .id(id) + .source(rootRecord.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(60)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + client.index(indexRequest, listener); + } + + private void storeEvents(Long level, float[] correlationVector) throws IOException { + Correlation event = new Correlation( + false, + level, + request.getEvent(), + "", + correlationVector, + request.getTimestamp(), + request.getIndex(), + "", + request.getTags(), + 0L + ); + + IndexRequest indexRequest = new IndexRequest(Correlation.CORRELATION_HISTORY_INDEX) + .source(event.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(60)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + client.index(indexRequest, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + if (response.status().equals(RestStatus.CREATED)) { + onOperation(); + } else { + onFailures(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + private void onOperation() { + finishHim(null); + } + + private void onFailures(Exception t) { + finishHim(t); + } + + private void finishHim(Exception t) { + if (t != null) { + listener.onFailure(t); + } else { + listener.onResponse(new StoreCorrelationResponse(RestStatus.OK)); + } + } + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/settings/EventsCorrelationSettings.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/settings/EventsCorrelationSettings.java index 2e2dbbffbeaa2..e68e78d528b98 100644 --- a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/settings/EventsCorrelationSettings.java +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/settings/EventsCorrelationSettings.java @@ -30,6 +30,11 @@ public class EventsCorrelationSettings { * Boolean setting to check if an OS index is a correlation index. */ public static final Setting IS_CORRELATION_INDEX_SETTING = Setting.boolSetting(CORRELATION_INDEX, false, IndexScope); + public static final Setting CORRELATION_HISTORY_INDEX_SHARDS = Setting.intSetting( + "plugins.events_correlation.correlation_history_index_shards", + 1, + Setting.Property.NodeScope, Setting.Property.Dynamic + ); /** * Global time window setting for Correlations */ diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/utils/CorrelationIndices.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/utils/CorrelationIndices.java new file mode 100644 index 0000000000000..a9cc3318cdce1 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/utils/CorrelationIndices.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.correlation.utils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.correlation.events.model.Correlation; +import org.opensearch.plugin.correlation.settings.EventsCorrelationSettings; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Objects; + +public class CorrelationIndices { + + private static final Logger log = LogManager.getLogger(CorrelationIndices.class); + public static final long FIXED_HISTORICAL_INTERVAL = 24L * 60L * 60L * 20L * 1000L; + + private final Client client; + + private final ClusterService clusterService; + + private final Settings settings; + + private volatile int noOfShards; + + public CorrelationIndices(Client client, ClusterService clusterService, Settings settings) { + this.client = client; + this.clusterService = clusterService; + this.settings = settings; + this.noOfShards = EventsCorrelationSettings.CORRELATION_HISTORY_INDEX_SHARDS.get(this.settings); + this.clusterService.getClusterSettings().addSettingsUpdateConsumer(EventsCorrelationSettings.CORRELATION_HISTORY_INDEX_SHARDS, it -> noOfShards = it); + } + + public static String correlationMappings() throws IOException { + return new String(Objects.requireNonNull(CorrelationIndices.class.getClassLoader().getResourceAsStream("mappings/correlation.json")).readAllBytes(), Charset.defaultCharset()); + } + + public void initCorrelationIndex(ActionListener actionListener) throws IOException { + if (!correlationIndexExists()) { + CreateIndexRequest indexRequest = new CreateIndexRequest(Correlation.CORRELATION_HISTORY_INDEX) + .mapping(correlationMappings()) + .settings(Settings.builder().put("index.hidden", true).put("number_of_shards", noOfShards).put("index.correlation", true).build()); + client.admin().indices().create(indexRequest, actionListener); + } + } + + public boolean correlationIndexExists() { + ClusterState clusterState = clusterService.state(); + return clusterState.getRoutingTable().hasIndex(Correlation.CORRELATION_HISTORY_INDEX); + } + + public void setupCorrelationIndex(Long setupTimestamp, ActionListener listener) throws IOException { + long currentTimestamp = System.currentTimeMillis(); + + Correlation rootRecord = new Correlation( + true, + 0L, + "", + "", + new float[]{0.0f, 0.0f, 0.0f}, + currentTimestamp, + "", + "", + List.of(), + 0L); + IndexRequest indexRequest = new IndexRequest(Correlation.CORRELATION_HISTORY_INDEX) + .source(rootRecord.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(60)); + + Correlation scoreRootRecord = new Correlation( + false, + 0L, + "", + "", + new float[]{0.0f, 0.0f, 0.0f}, + 0L, + "", + "", + List.of(), + setupTimestamp + ); + IndexRequest scoreIndexRequest = new IndexRequest(Correlation.CORRELATION_HISTORY_INDEX) + .source(scoreRootRecord.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(60)); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(indexRequest); + bulkRequest.add(scoreIndexRequest); + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + client.bulk(bulkRequest, listener); + } +} diff --git a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/utils/IndexUtils.java b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/utils/IndexUtils.java index a240fa17d1d22..a047ada842769 100644 --- a/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/utils/IndexUtils.java +++ b/plugins/events-correlation-engine/src/main/java/org/opensearch/plugin/correlation/utils/IndexUtils.java @@ -40,6 +40,7 @@ public class IndexUtils { * manages the mappings lifecycle for correlation rule index */ public static Boolean correlationRuleIndexUpdated = false; + public static Boolean correlationHistoryIndexUpdated = false; private IndexUtils() {} @@ -50,6 +51,10 @@ public static void correlationRuleIndexUpdated() { correlationRuleIndexUpdated = true; } + public static void correlationHistoryIndexUpdated() { + correlationHistoryIndexUpdated = true; + } + /** * util method which decides based on schema version whether to update an index. * @param index IndexMetadata diff --git a/plugins/events-correlation-engine/src/main/resources/mappings/correlation.json b/plugins/events-correlation-engine/src/main/resources/mappings/correlation.json new file mode 100644 index 0000000000000..008366149d0d7 --- /dev/null +++ b/plugins/events-correlation-engine/src/main/resources/mappings/correlation.json @@ -0,0 +1,50 @@ +{ + "_meta" : { + "schema_version": 1 + }, + "properties": { + "root": { + "type": "boolean" + }, + "level":{ + "type": "long" + }, + "event1":{ + "type": "keyword" + }, + "event2":{ + "type": "keyword" + }, + "corr_vector": { + "type": "correlation_vector", + "dimension": 3, + "correlation_ctx": { + "similarityFunction": "EUCLIDEAN", + "parameters": { + "m": 16, + "ef_construction": 128 + } + } + }, + "timestamp":{ + "type": "long" + }, + "index1": { + "type": "keyword" + }, + "index2": { + "type": "keyword" + }, + "tags": { + "type": "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "score_timestamp": { + "type": "long" + } + } +}