Skip to content

Commit

Permalink
add joins, insertion & search to correlation engine
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Jun 26, 2023
1 parent c29e4aa commit 20e81ad
Show file tree
Hide file tree
Showing 29 changed files with 4,145 additions and 8 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
Expand Down Expand Up @@ -102,6 +103,67 @@ public void testCreatingACorrelationRuleWithNoTimestampField() throws IOExceptio
);
}

@SuppressWarnings("unchecked")
public void testCorrelationWithSingleRule() throws IOException {
String windowsIndex = "windows";
Request request = new Request("PUT", "/" + windowsIndex);
request.setJsonEntity(windowsMappings());
client().performRequest(request);

String appLogsIndex = "app_logs";
request = new Request("PUT", "/" + appLogsIndex);
request.setJsonEntity(appLogMappings());
client().performRequest(request);

String correlationRule = windowsToAppLogsCorrelationRule();
request = new Request("POST", "/_correlation/rules");
request.setJsonEntity(correlationRule);
client().performRequest(request);

request = new Request("POST", String.format(Locale.ROOT, "/%s/_doc?refresh", windowsIndex));
request.setJsonEntity(sampleWindowsEvent());
client().performRequest(request);

request = new Request("POST", String.format(Locale.ROOT, "/%s/_doc?refresh", appLogsIndex));
request.setJsonEntity(sampleAppLogsEvent());
Response response = client().performRequest(request);
String appLogsId = responseAsMap(response).get("_id").toString();

request = new Request("POST", "/_correlation/events");
request.setJsonEntity(prepareCorrelateEventRequest(appLogsIndex, appLogsId));
response = client().performRequest(request);
Map<String, Object> responseAsMap = responseAsMap(response);
Assert.assertEquals(1, ((Map<String, Object>) responseAsMap.get("neighbor_events")).size());
}

private String prepareCorrelateEventRequest(String index, String event) {
return "{\n" + " \"index\": \"" + index + "\",\n" + " \"event\": \"" + event + "\",\n" + " \"store\": false\n" + "}";
}

private String windowsToAppLogsCorrelationRule() {
return "{\n"
+ " \"name\": \"windows to app logs\",\n"
+ " \"correlate\": [\n"
+ " {\n"
+ " \"index\": \"windows\",\n"
+ " \"query\": \"host.hostname:EC2AMAZ*\",\n"
+ " \"timestampField\": \"winlog.timestamp\",\n"
+ " \"tags\": [\n"
+ " \"windows\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"index\": \"app_logs\",\n"
+ " \"query\": \"endpoint:\\\\/customer_records.txt\",\n"
+ " \"timestampField\": \"timestamp\",\n"
+ " \"tags\": [\n"
+ " \"others_application\"\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ "}";
}

private String sampleCorrelationRule() {
return "{\n"
+ " \"name\": \"s3 to app logs\",\n"
Expand Down Expand Up @@ -151,4 +213,115 @@ private String sampleCorrelationRuleWithNoTimestamp() {
private String matchIdQuery(String id) {
return "{\n" + " \"query\" : {\n" + " \"match\":{\n" + " \"_id\": \"" + id + "\"\n" + " }\n" + " }\n" + "}";
}

private String windowsMappings() {
return "{"
+ " \"settings\": {"
+ " \"number_of_shards\": 1"
+ " },"
+ " \"mappings\": {"
+ " \"properties\": {\n"
+ " \"server.user.hash\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.event_id\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"host.hostname\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"windows.message\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.provider_name\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.event_data.ServiceName\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"winlog.timestamp\": {\n"
+ " \"type\": \"long\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}";
}

private String appLogMappings() {
return "{"
+ " \"settings\": {"
+ " \"number_of_shards\": 1"
+ " },"
+ " \"mappings\": {"
+ " \"properties\": {\n"
+ " \"http_method\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"endpoint\": {\n"
+ " \"type\": \"text\",\n"
+ " \"analyzer\": \"whitespace\""
+ " },\n"
+ " \"keywords\": {\n"
+ " \"type\": \"text\"\n"
+ " },\n"
+ " \"timestamp\": {\n"
+ " \"type\": \"long\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}";
}

private String sampleWindowsEvent() {
return "{\n"
+ " \"EventTime\": \"2020-02-04T14:59:39.343541+00:00\",\n"
+ " \"host.hostname\": \"EC2AMAZEPO7HKA\",\n"
+ " \"Keywords\": \"9223372036854775808\",\n"
+ " \"SeverityValue\": 2,\n"
+ " \"Severity\": \"INFO\",\n"
+ " \"winlog.event_id\": 22,\n"
+ " \"SourceName\": \"Microsoft-Windows-Sysmon\",\n"
+ " \"ProviderGuid\": \"{5770385F-C22A-43E0-BF4C-06F5698FFBD9}\",\n"
+ " \"Version\": 5,\n"
+ " \"TaskValue\": 22,\n"
+ " \"OpcodeValue\": 0,\n"
+ " \"RecordNumber\": 9532,\n"
+ " \"ExecutionProcessID\": 1996,\n"
+ " \"ExecutionThreadID\": 2616,\n"
+ " \"Channel\": \"Microsoft-Windows-Sysmon/Operational\",\n"
+ " \"winlog.event_data.SubjectDomainName\": \"NTAUTHORITY\",\n"
+ " \"AccountName\": \"SYSTEM\",\n"
+ " \"UserID\": \"S-1-5-18\",\n"
+ " \"AccountType\": \"User\",\n"
+ " \"windows.message\": \"Dns query:\\r\\nRuleName: \\r\\nUtcTime: 2020-02-04 14:59:38.349\\r\\nProcessGuid: {b3c285a4-3cda-5dc0-0000-001077270b00}\\r\\nProcessId: 1904\\r\\nQueryName: EC2AMAZ-EPO7HKA\\r\\nQueryStatus: 0\\r\\nQueryResults: 172.31.46.38;\\r\\nImage: C:\\\\Program Files\\\\nxlog\\\\nxlog.exe\",\n"
+ " \"Category\": \"Dns query (rule: DnsQuery)\",\n"
+ " \"Opcode\": \"Info\",\n"
+ " \"UtcTime\": \"2020-02-04 14:59:38.349\",\n"
+ " \"ProcessGuid\": \"{b3c285a4-3cda-5dc0-0000-001077270b00}\",\n"
+ " \"ProcessId\": \"1904\",\n"
+ " \"QueryName\": \"EC2AMAZ-EPO7HKA\",\n"
+ " \"QueryStatus\": \"0\",\n"
+ " \"QueryResults\": \"172.31.46.38;\",\n"
+ " \"Image\": \"C:\\\\Program Files\\\\nxlog\\\\regsvr32.exe\",\n"
+ " \"EventReceivedTime\": \"2020-02-04T14:59:40.780905+00:00\",\n"
+ " \"SourceModuleName\": \"in\",\n"
+ " \"SourceModuleType\": \"im_msvistalog\",\n"
+ " \"CommandLine\": \"eachtest\",\n"
+ " \"Initiated\": \"true\",\n"
+ " \"winlog.timestamp\": "
+ System.currentTimeMillis()
+ "\n"
+ "}";
}

private String sampleAppLogsEvent() {
return "{\n"
+ " \"endpoint\": \"/customer_records.txt\",\n"
+ " \"http_method\": \"POST\",\n"
+ " \"keywords\": \"PermissionDenied\",\n"
+ " \"timestamp\": "
+ System.currentTimeMillis()
+ "\n"
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@
import org.opensearch.plugin.correlation.core.index.mapper.CorrelationVectorFieldMapper;
import org.opensearch.plugin.correlation.core.index.mapper.VectorFieldMapper;
import org.opensearch.plugin.correlation.core.index.query.CorrelationQueryBuilder;
import org.opensearch.plugin.correlation.events.action.IndexCorrelationAction;
import org.opensearch.plugin.correlation.events.action.SearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.action.StoreCorrelationAction;
import org.opensearch.plugin.correlation.events.resthandler.RestIndexCorrelationAction;
import org.opensearch.plugin.correlation.events.resthandler.RestSearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.transport.TransportIndexCorrelationAction;
import org.opensearch.plugin.correlation.events.transport.TransportSearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.transport.TransportStoreCorrelationAction;
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.resthandler.RestIndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.transport.TransportIndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.settings.EventsCorrelationSettings;
import org.opensearch.plugin.correlation.utils.CorrelationIndices;
import org.opensearch.plugin.correlation.utils.CorrelationRuleIndices;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.EnginePlugin;
Expand Down Expand Up @@ -67,9 +76,12 @@ public class EventsCorrelationPlugin extends Plugin implements ActionPlugin, Map
* events-correlation-engine rules uri
*/
public static final String CORRELATION_RULES_BASE_URI = PLUGINS_BASE_URI + "/rules";
public static final String CORRELATION_EVENTS_BASE_URI = PLUGINS_BASE_URI + "/events";

private CorrelationRuleIndices correlationRuleIndices;

private CorrelationIndices correlationIndices;

/**
* Default constructor
*/
Expand All @@ -90,7 +102,8 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
correlationRuleIndices = new CorrelationRuleIndices(client, clusterService);
return List.of(correlationRuleIndices);
correlationIndices = new CorrelationIndices(client, clusterService, clusterService.getSettings());
return List.of(correlationRuleIndices, correlationIndices);
}

@Override
Expand All @@ -103,7 +116,7 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestIndexCorrelationRuleAction());
return List.of(new RestIndexCorrelationRuleAction(), new RestSearchCorrelatedEventsAction(), new RestIndexCorrelationAction());
}

@Override
Expand Down Expand Up @@ -132,11 +145,20 @@ public List<QuerySpec<?>> getQueries() {

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class));
return List.of(
new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class),
new ActionPlugin.ActionHandler<>(IndexCorrelationAction.INSTANCE, TransportIndexCorrelationAction.class),
new ActionPlugin.ActionHandler<>(StoreCorrelationAction.INSTANCE, TransportStoreCorrelationAction.class),
new ActionPlugin.ActionHandler<>(SearchCorrelatedEventsAction.INSTANCE, TransportSearchCorrelatedEventsAction.class)
);
}

@Override
public List<Setting<?>> getSettings() {
return List.of(EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING, EventsCorrelationSettings.CORRELATION_TIME_WINDOW);
return List.of(
EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING,
EventsCorrelationSettings.CORRELATION_HISTORY_INDEX_SHARDS,
EventsCorrelationSettings.CORRELATION_TIME_WINDOW
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionType;

/**
* Transport Action for indexing correlations
*
* @opensearch.internal
*/
public class IndexCorrelationAction extends ActionType<IndexCorrelationResponse> {

/**
* Instance of IndexCorrelationAction
*/
public static final IndexCorrelationAction INSTANCE = new IndexCorrelationAction();
/**
* Name of IndexCorrelationAction
*/
public static final String NAME = "cluster:admin/index/correlation/events";

private IndexCorrelationAction() {
super(NAME, IndexCorrelationResponse::new);
}
}
Loading

0 comments on commit 20e81ad

Please sign in to comment.