Skip to content

Commit

Permalink
rule engine testing
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Mar 4, 2024
1 parent cad5e3a commit a2df5c7
Show file tree
Hide file tree
Showing 143 changed files with 5,934 additions and 21 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:failures-common')
implementation project(':data-prepper-plugins:rule-engine')
implementation libs.opensearch.client
implementation libs.opensearch.rhlc
implementation libs.opensearch.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;

Expand Down Expand Up @@ -45,26 +46,32 @@ public class BulkOperationWrapper {
private final EventHandle eventHandle;
private final BulkOperation bulkOperation;
private final SerializedJson jsonNode;
private final Event event;

public BulkOperationWrapper(final BulkOperation bulkOperation) {
this(bulkOperation, null, null);
this(bulkOperation, null, null, null);
}

public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode) {
public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode, final Event event) {
checkNotNull(bulkOperation);
this.bulkOperation = bulkOperation;
this.eventHandle = eventHandle;
this.jsonNode = jsonNode;
this.event = event;
}

public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle) {
this(bulkOperation, eventHandle, null);
this(bulkOperation, eventHandle, null, null);
}

public BulkOperation getBulkOperation() {
return bulkOperation;
}

public Event getEvent() {
return event;
}

public EventHandle getEventHandle() {
return eventHandle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.processor.model.event.EventWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation;
import org.opensearch.rest.RestStatus;
Expand All @@ -23,6 +24,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -185,21 +187,23 @@ private void incrementErrorCounters(final Exception e) {
}
}

public void execute(final AccumulatingBulkRequest bulkRequest) throws InterruptedException {
public List<EventWrapper> execute(final AccumulatingBulkRequest bulkRequest) throws InterruptedException {
final List<EventWrapper> eventWrappers = new ArrayList<>();

final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS).withMaxAttempts(maxRetries);
BulkOperationRequestResponse operationResponse;
BulkResponse response = null;
AccumulatingBulkRequest request = bulkRequest;
int attempt = 1;
do {
operationResponse = handleRetry(request, response, attempt);
operationResponse = handleRetry(request, response, attempt, eventWrappers);
if (operationResponse != null) {
final long delayMillis = backoff.nextDelayMillis(attempt++);
request = operationResponse.getBulkRequest();
response = operationResponse.getResponse();
if (delayMillis < 0) {
RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d)", maxRetries));
handleFailures(request, null, e);
handleFailures(request, null, e, eventWrappers);
break;
}
// Wait for backOff duration
Expand All @@ -210,6 +214,8 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte
}
}
} while (operationResponse != null);

return eventWrappers;
}

public boolean canRetry(final BulkResponse response) {
Expand All @@ -230,7 +236,8 @@ public static boolean canRetry(final Exception e) {
private BulkOperationRequestResponse handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry,
final int retryCount,
final BulkResponse bulkResponse,
final Exception exceptionFromRequest) {
final Exception exceptionFromRequest,
final List<EventWrapper> eventWrappers) {
final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest);
if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
Expand All @@ -253,29 +260,31 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
bulkRequestNumberOfRetries.increment();
return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse);
} else {
handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest);
handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest, eventWrappers);
}
return null;
}

private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest, final BulkResponse bulkResponse, final Throwable failure) {
private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest, final BulkResponse bulkResponse, final Throwable failure,
final List<EventWrapper> eventWrappers) {
if (failure == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
// Skip logging the error for version conflicts
if (bulkItemResponse.error() != null && !VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
}
}
handleFailures(bulkRequest, bulkResponse.items());
handleFailures(bulkRequest, bulkResponse.items(), eventWrappers);
} else {
LOG.warn("Bulk Operation Failed.", failure);
handleFailures(bulkRequest, failure);
}
bulkRequestFailedCounter.increment();
}

private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest request, final BulkResponse response, int retryCount) throws InterruptedException {
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequestForRetry = createBulkRequestForRetry(request, response);
private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest request, final BulkResponse response, int retryCount,
final List<EventWrapper> eventWrappers) throws InterruptedException {
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequestForRetry = createBulkRequestForRetry(request, response, eventWrappers);
if (bulkRequestForRetry.getOperationsCount() == 0) {
return null;
}
Expand All @@ -285,26 +294,30 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r
bulkResponse = requestFunction.apply(bulkRequestForRetry);
} catch (Exception e) {
incrementErrorCounters(e);
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e);
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e, eventWrappers);
}
if (bulkResponse.errors()) {
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null);
return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null, eventWrappers);
} else {
final int numberOfDocs = bulkRequestForRetry.getOperationsCount();
final boolean firstAttempt = (retryCount == 1);
if (firstAttempt) {
sentDocumentsOnFirstAttemptCounter.increment(numberOfDocs);
}
sentDocumentsCounter.increment(bulkRequestForRetry.getOperationsCount());
for (final BulkOperationWrapper bulkOperation: bulkRequestForRetry.getOperations()) {
for (int i = 0; i < bulkResponse.items().size(); i++) {
final BulkOperationWrapper bulkOperation = bulkRequestForRetry.getOperationAt(i);
final BulkResponseItem bulkResponseItem = bulkResponse.items().get(i);

bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkResponseItem));
}
}
return null;
}

private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkRequestForRetry(
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> request, final BulkResponse response) {
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> request, final BulkResponse response, final List<EventWrapper> eventWrappers) {
if (response == null) {
// first attempt or retry due to Exception
return request;
Expand All @@ -322,6 +335,7 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse));
} else {
nonRetryableFailures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
Expand All @@ -332,6 +346,7 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
} else {
sentDocumentsCounter.increment();
bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse));
}
index++;
}
Expand All @@ -343,7 +358,8 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
}
}

private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> accumulatingBulkRequest, final List<BulkResponseItem> itemResponses) {
private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> accumulatingBulkRequest, final List<BulkResponseItem> itemResponses,
final List<EventWrapper> eventWrappers) {
assert accumulatingBulkRequest.getOperationsCount() == itemResponses.size();
final ImmutableList.Builder<FailedBulkOperation> failures = ImmutableList.builder();
for (int i = 0; i < itemResponses.size(); i++) {
Expand All @@ -354,6 +370,7 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse));
} else {
failures.add(FailedBulkOperation.builder()
.withBulkOperation(bulkOperation)
Expand All @@ -364,6 +381,7 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
} else {
sentDocumentsCounter.increment();
bulkOperation.releaseEventHandle(true);
eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse));
}
}
logFailure.accept(failures.build(), null);
Expand All @@ -384,4 +402,8 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
logFailure.accept(failures.build(), failure);
}

private EventWrapper createEventWrapper(final BulkOperationWrapper bulkOperationWrapper, final BulkResponseItem bulkResponseItem) {
return new EventWrapper(bulkResponseItem.index(), bulkResponseItem.id(), bulkOperationWrapper.getEvent());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import org.opensearch.dataprepper.plugins.common.opensearch.ServerlessOptionsFactory;
import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
import org.opensearch.dataprepper.plugins.processor.RuleEngine;
import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig;
import org.opensearch.dataprepper.plugins.processor.model.event.EventWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkApiWrapperFactory;
Expand Down Expand Up @@ -72,6 +75,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -139,6 +143,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private DlqProvider dlqProvider;
private final ConcurrentHashMap<Long, AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>> bulkRequestMap;
private final ConcurrentHashMap<Long, Long> lastFlushTimeMap;
private RuleEngine ruleEngine = null;

@DataPrepperPluginConstructor
public OpenSearchSink(final PluginSetting pluginSetting,
Expand Down Expand Up @@ -261,6 +266,10 @@ private void doInitializeInternal() throws IOException {
maybeUpdateServerlessNetworkPolicy();

objectMapper = new ObjectMapper();

final Optional<RuleEngineConfig> ruleEngineConfig = openSearchSinkConfig.getRuleEngineConfig();
ruleEngineConfig.ifPresent(engineConfig -> ruleEngine = new RuleEngine(engineConfig, openSearchClient));

this.initialized = true;
LOG.info("Initialized OpenSearch sink");
}
Expand Down Expand Up @@ -440,7 +449,7 @@ public void doOutput(final Collection<Record<Event>> records) {
continue;
}

BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode);
BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode, event);
final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper);
if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) {
flushBatch(bulkRequest);
Expand Down Expand Up @@ -494,7 +503,10 @@ private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) {
bulkRequestTimer.record(() -> {
try {
LOG.debug("Sending data to OpenSearch");
bulkRetryStrategy.execute(accumulatingBulkRequest);
final List<EventWrapper> eventWrappers = bulkRetryStrategy.execute(accumulatingBulkRequest);
if (ruleEngine != null) {
ruleEngine.doExecute(eventWrappers);
}
bulkRequestSizeBytesSummary.record(accumulatingBulkRequest.getEstimatedSizeInBytes());
} catch (final InterruptedException e) {
LOG.error("Unexpected Interrupt:", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig;
import org.opensearch.dataprepper.plugins.processor.RuleEngineConfigWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkNotNull;

public class OpenSearchSinkConfiguration {

private final ConnectionConfiguration connectionConfiguration;
private final IndexConfiguration indexConfiguration;
private final RetryConfiguration retryConfiguration;
private final RuleEngineConfigWrapper ruleEngineConfigWrapper;

public ConnectionConfiguration getConnectionConfiguration() {
return connectionConfiguration;
Expand All @@ -29,15 +34,21 @@ public RetryConfiguration getRetryConfiguration() {
return retryConfiguration;
}

public Optional<RuleEngineConfig> getRuleEngineConfig() {
return Optional.ofNullable(ruleEngineConfigWrapper.getRuleEngineConfig());
}

private OpenSearchSinkConfiguration(
final ConnectionConfiguration connectionConfiguration, final IndexConfiguration indexConfiguration,
final RetryConfiguration retryConfiguration) {
final RetryConfiguration retryConfiguration, final RuleEngineConfigWrapper ruleEngineConfigWrapper) {
checkNotNull(connectionConfiguration, "connectionConfiguration cannot be null");
checkNotNull(indexConfiguration, "indexConfiguration cannot be null");
checkNotNull(retryConfiguration, "retryConfiguration cannot be null");
checkNotNull(ruleEngineConfigWrapper, "ruleEngineConfigWrapper cannot be null");
this.connectionConfiguration = connectionConfiguration;
this.indexConfiguration = indexConfiguration;
this.retryConfiguration = retryConfiguration;
this.ruleEngineConfigWrapper = ruleEngineConfigWrapper;
}

public static OpenSearchSinkConfiguration readESConfig(final PluginSetting pluginSetting) {
Expand All @@ -49,7 +60,8 @@ public static OpenSearchSinkConfiguration readESConfig(final PluginSetting plugi
ConnectionConfiguration.readConnectionConfiguration(pluginSetting);
final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting, expressionEvaluator);
final RetryConfiguration retryConfiguration = RetryConfiguration.readRetryConfig(pluginSetting);
final RuleEngineConfigWrapper ruleEngineConfigWrapper = RuleEngineConfigWrapper.readRuleEngineConfigWrapper(pluginSetting);

return new OpenSearchSinkConfiguration(connectionConfiguration, indexConfiguration, retryConfiguration);
return new OpenSearchSinkConfiguration(connectionConfiguration, indexConfiguration, retryConfiguration, ruleEngineConfigWrapper);
}
}
Loading

0 comments on commit a2df5c7

Please sign in to comment.