Skip to content

Commit

Permalink
add joins, insertion & search to correlation engine
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed May 26, 2023
1 parent adf7e2c commit 6bc37f0
Show file tree
Hide file tree
Showing 21 changed files with 3,168 additions and 3 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugin.correlation.events.action.IndexCorrelationAction;
import org.opensearch.plugin.correlation.events.action.SearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.action.StoreCorrelationAction;
import org.opensearch.plugin.correlation.events.transport.TransportIndexCorrelationAction;
import org.opensearch.plugin.correlation.events.transport.TransportSearchCorrelatedEventsAction;
import org.opensearch.plugin.correlation.events.transport.TransportStoreCorrelationAction;
import org.opensearch.plugin.correlation.rules.action.IndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.resthandler.RestIndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.rules.transport.TransportIndexCorrelationRuleAction;
import org.opensearch.plugin.correlation.settings.EventsCorrelationSettings;
import org.opensearch.plugin.correlation.utils.CorrelationIndices;
import org.opensearch.plugin.correlation.utils.CorrelationRuleIndices;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -57,6 +64,8 @@ public class EventsCorrelationPlugin extends Plugin implements ActionPlugin {

private CorrelationRuleIndices correlationRuleIndices;

private CorrelationIndices correlationIndices;

/**
* Default constructor
*/
Expand All @@ -77,7 +86,8 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
correlationRuleIndices = new CorrelationRuleIndices(client, clusterService);
return List.of(correlationRuleIndices);
correlationIndices = new CorrelationIndices(client, clusterService, clusterService.getSettings());
return List.of(correlationRuleIndices, correlationIndices);
}

@Override
Expand All @@ -95,11 +105,18 @@ public List<RestHandler> getRestHandlers(

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class));
return List.of(new ActionPlugin.ActionHandler<>(IndexCorrelationRuleAction.INSTANCE, TransportIndexCorrelationRuleAction.class),
new ActionPlugin.ActionHandler<>(IndexCorrelationAction.INSTANCE, TransportIndexCorrelationAction.class),
new ActionPlugin.ActionHandler<>(StoreCorrelationAction.INSTANCE, TransportStoreCorrelationAction.class),
new ActionPlugin.ActionHandler<>(SearchCorrelatedEventsAction.INSTANCE, TransportSearchCorrelatedEventsAction.class));
}

@Override
public List<Setting<?>> getSettings() {
return List.of(EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING, EventsCorrelationSettings.CORRELATION_TIME_WINDOW);
return List.of(
EventsCorrelationSettings.IS_CORRELATION_INDEX_SETTING,
EventsCorrelationSettings.CORRELATION_HISTORY_INDEX_SHARDS,
EventsCorrelationSettings.CORRELATION_TIME_WINDOW
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionType;

public class IndexCorrelationAction extends ActionType<IndexCorrelationResponse> {

public static final IndexCorrelationAction INSTANCE = new IndexCorrelationAction();
public static final String NAME = "cluster:admin/index/correlation/events";

private IndexCorrelationAction() {
super(NAME, IndexCorrelationResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class IndexCorrelationRequest extends ActionRequest {

private String index;

private String event;

private Boolean store;

public IndexCorrelationRequest(String index, String event, Boolean store) {
super();
this.index = index;
this.event = event;
this.store = store;
}

public IndexCorrelationRequest(StreamInput sin) throws IOException {
this(sin.readString(), sin.readString(), sin.readBoolean());
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeString(event);
out.writeBoolean(store);
}

public String getIndex() {
return index;
}

public String getEvent() {
return event;
}

public Boolean getStore() {
return store;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class IndexCorrelationResponse extends ActionResponse {

private Boolean isOrphan;

private Map<String, List<String>> neighborEvents;

private RestStatus status;

public IndexCorrelationResponse(Boolean isOrphan, Map<String, List<String>> neighborEvents, RestStatus status) {
super();
this.isOrphan = isOrphan;
this.neighborEvents = neighborEvents;
this.status = status;
}

public IndexCorrelationResponse(StreamInput sin) throws IOException {
this(
sin.readBoolean(),
sin.readMap(StreamInput::readString, StreamInput::readStringList),
sin.readEnum(RestStatus.class)
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isOrphan);
out.writeMap(neighborEvents, StreamOutput::writeString, StreamOutput::writeStringCollection);
out.writeEnum(status);
}

public RestStatus getStatus() {
return status;
}

public Boolean getOrphan() {
return isOrphan;
}

public Map<String, List<String>> getNeighborEvents() {
return neighborEvents;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionType;

public class SearchCorrelatedEventsAction extends ActionType<SearchCorrelatedEventsResponse> {

public static final SearchCorrelatedEventsAction INSTANCE = new SearchCorrelatedEventsAction();
public static final String NAME = "cluster:admin/search/correlation/events";

private SearchCorrelatedEventsAction() {
super(NAME, SearchCorrelatedEventsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class SearchCorrelatedEventsRequest extends ActionRequest {

private String index;

private String event;

private String timestampField;

private Long timeWindow;

private Integer nearbyEvents;

public SearchCorrelatedEventsRequest(String index, String event, String timestampField, Long timeWindow, Integer nearbyEvents) {
super();
this.index = index;
this.event = event;
this.timestampField = timestampField;
this.timeWindow = timeWindow;
this.nearbyEvents = nearbyEvents;
}

public SearchCorrelatedEventsRequest(StreamInput sin) throws IOException {
this(
sin.readString(),
sin.readString(),
sin.readString(),
sin.readLong(),
sin.readInt()
);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeString(event);
out.writeString(timestampField);
out.writeLong(timeWindow);
out.writeInt(nearbyEvents);
}

public String getIndex() {
return index;
}

public String getEvent() {
return event;
}

public String getTimestampField() {
return timestampField;
}

public Long getTimeWindow() {
return timeWindow;
}

public Integer getNearbyEvents() {
return nearbyEvents;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.plugin.correlation.events.model.EventWithScore;
import org.opensearch.rest.RestStatus;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class SearchCorrelatedEventsResponse extends ActionResponse implements ToXContentObject {

private List<EventWithScore> events;

private RestStatus status;

private static final String EVENTS = "events";

public SearchCorrelatedEventsResponse(List<EventWithScore> events, RestStatus status) {
super();
this.events = events;
this.status = status;
}

public SearchCorrelatedEventsResponse(StreamInput sin) throws IOException {
this(
Collections.unmodifiableList(sin.readList(EventWithScore::new)),
sin.readEnum(RestStatus.class)
);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(EVENTS, events)
.endObject();
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(events);
out.writeEnum(status);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.correlation.events.action;

import org.opensearch.action.ActionType;

public class StoreCorrelationAction extends ActionType<StoreCorrelationResponse> {

public static final StoreCorrelationAction INSTANCE = new StoreCorrelationAction();
public static final String NAME = "cluster:admin/store/correlation/events";

private StoreCorrelationAction() {
super(NAME, StoreCorrelationResponse::new);
}
}
Loading

0 comments on commit 6bc37f0

Please sign in to comment.