Skip to content

Commit

Permalink
Add expiry monitor to adjust SQS viz timeout if processing has not ye…
Browse files Browse the repository at this point in the history
…t completed

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 30, 2023
1 parent ca3d6ac commit bb37905
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Instant;

/**
* AcknowledgmentSet keeps track of set of events that
* belong to the batch of events that a source creates.
Expand Down Expand Up @@ -58,4 +60,10 @@ public interface AcknowledgementSet {
* initial events are going through the pipeline line.
*/
public void complete();

Instant getExpiryTime();

void setExpiryTime(final Instant expiryTime);

boolean isDone();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Consumer;

/**
Expand All @@ -30,6 +31,8 @@ public interface AcknowledgementSetManager {
*/
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout);

void addExpiryMonitor(final ExpiryItem expiryItem);

/**
* Releases an event's reference
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.opensearch.dataprepper.model.acknowledgements;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Consumer;

public class ExpiryItem {
private static final Logger LOG = LoggerFactory.getLogger(ExpiryItem.class);

private final String itemId;
private final Instant startTime;
private final long pollSeconds;
private final Consumer expiryCallback;
private Instant expirationTime;
private AcknowledgementSet acknowledgementSet;

public ExpiryItem(final String itemId, final Instant startTime, final long pollSeconds, final Instant expirationTime,
final Consumer<ExpiryItem> expiryCallback, final AcknowledgementSet acknowledgementSet) {
this.itemId = itemId;
this.startTime = startTime;
if (pollSeconds <= 2) {
throw new UnsupportedOperationException("The poll interval must be at least 3 seconds to enable expiry monitoring");
}
this.pollSeconds = pollSeconds;
this.expirationTime = expirationTime;
this.expiryCallback = expiryCallback;
this.acknowledgementSet = acknowledgementSet;
}

public long getPollSeconds() {
return pollSeconds;
}

public String getItemId() {
return itemId;
}

public boolean executeExpiryCallback() {
try {
expiryCallback.accept(this);
return true;
} catch (final Exception e) {
LOG.error("Exception occurred when executing the expiry callback", e.getMessage());
return false;
}
}

public boolean isCompleteOrExpired() {
return acknowledgementSet.isDone() || Instant.now().isAfter(expirationTime);
}

public void updateExpirationTime() {
LOG.info("Updating expiration time for item ID {} to {}", itemId, expirationTime);
expirationTime = expirationTime.plusSeconds(pollSeconds);
acknowledgementSet.setExpiryTime(expirationTime);
}

public long getSecondsBetweenStartAndExpiration() {
return Duration.between(startTime, expirationTime).getSeconds();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class DefaultAcknowledgementSet implements AcknowledgementSet {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class);
private final Consumer<Boolean> callback;
private final Instant expiryTime;
private Instant expiryTime;
private final ExecutorService executor;
// This lock protects all the non-final members
private final ReentrantLock lock;
Expand All @@ -36,7 +36,8 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet {
private final DefaultAcknowledgementSetMetrics metrics;
private boolean completed;

public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime, final DefaultAcknowledgementSetMetrics metrics) {
public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime,
final DefaultAcknowledgementSetMetrics metrics) {
this.callback = callback;
this.result = true;
this.executor = executor;
Expand Down Expand Up @@ -76,6 +77,7 @@ public void acquire(final EventHandle eventHandle) {
}
}

@Override
public boolean isDone() {
lock.lock();
try {
Expand All @@ -98,10 +100,16 @@ public boolean isDone() {
return false;
}

@Override
public Instant getExpiryTime() {
return expiryTime;
}

@Override
public void setExpiryTime(final Instant expiryTime) {
this.expiryTime = expiryTime;
}

@Override
public void complete() {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.ExpiryItem;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -26,6 +27,7 @@ public class DefaultAcknowledgementSetManager implements AcknowledgementSetManag
private final AcknowledgementSetMonitorThread acknowledgementSetMonitorThread;
private PluginMetrics pluginMetrics;
private DefaultAcknowledgementSetMetrics metrics;
private ExpiryMonitor expiryMonitor = null;

@Inject
public DefaultAcknowledgementSetManager(
Expand All @@ -40,6 +42,7 @@ public DefaultAcknowledgementSetManager(final ExecutorService callbackExecutor,
acknowledgementSetMonitorThread.start();
pluginMetrics = PluginMetrics.fromNames("acknowledgementSetManager", "acknowledgements");
metrics = new DefaultAcknowledgementSetMetrics(pluginMetrics);

}

public AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout) {
Expand All @@ -49,6 +52,15 @@ public AcknowledgementSet create(final Consumer<Boolean> callback, final Duratio
return acknowledgementSet;
}

@Override
public void addExpiryMonitor(final ExpiryItem expiryItem) {
if (Objects.isNull(expiryMonitor)) {
this.expiryMonitor = new ExpiryMonitor();
}

expiryMonitor.addExpiryItem(expiryItem);
}

public void acquireEventReference(final Event event) {
acquireEventReference(event.getEventHandle());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.ExpiryItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ExpiryMonitor {
private static final Logger LOG = LoggerFactory.getLogger(ExpiryMonitor.class);

private static final long CANCEL_CHECK_INTERVAL_SECONDS = 15;
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
private static final ConcurrentHashMap<ExpiryItem, ScheduledFuture> EXPIRY_MONITORS = new ConcurrentHashMap<>();

public ExpiryMonitor() {
SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
EXPIRY_MONITORS.forEach((expiryItem, future) -> {
final boolean isCompleteOrExpired = expiryItem.isCompleteOrExpired();
LOG.error("ExpiryItem with ID {} has completion/expiry status {}", expiryItem.getItemId(), isCompleteOrExpired);

if (isCompleteOrExpired) {
future.cancel(false);
EXPIRY_MONITORS.remove(expiryItem);
}
});
}, 0, CANCEL_CHECK_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

public void addExpiryItem(final ExpiryItem expiryItem) {
final ScheduledFuture expiryMonitor = SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(monitorExpiration(expiryItem),
0, expiryItem.getPollSeconds() - 2, TimeUnit.SECONDS);
EXPIRY_MONITORS.put(expiryItem, expiryMonitor);
}

private Runnable monitorExpiration(final ExpiryItem expiryItem) {
return () -> {
final boolean callBackSuccess = expiryItem.executeExpiryCallback();
if (callBackSuccess) {
expiryItem.updateExpirationTime();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.acknowledgements;

import org.opensearch.dataprepper.model.acknowledgements.ExpiryItem;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand All @@ -26,6 +27,11 @@ public AcknowledgementSet create(final Consumer<Boolean> callback, final Duratio
throw new UnsupportedOperationException("create operation not supported");
}

@Override
public void addExpiryMonitor(final ExpiryItem expiryItem) {
throw new UnsupportedOperationException("add expiry monitor operation not supported");
}

public void acquireEventReference(final Event event) {
throw new UnsupportedOperationException("acquire operation not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.ExpiryItem;
import org.opensearch.dataprepper.plugins.source.s3.configuration.NotificationSourceOption;
import org.opensearch.dataprepper.plugins.source.s3.configuration.OnErrorOption;
import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions;
Expand All @@ -26,6 +27,7 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
Expand All @@ -42,6 +44,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class SqsWorker implements Runnable {
Expand All @@ -52,6 +55,7 @@ public class SqsWorker implements Runnable {
static final String SQS_MESSAGES_DELETE_FAILED_METRIC_NAME = "sqsMessagesDeleteFailed";
static final String SQS_MESSAGE_DELAY_METRIC_NAME = "sqsMessageDelay";
static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter";
static final long VISIBILITY_TIMEOUT_INCREASE_BUFFER = 15L;

private final S3SourceConfig s3SourceConfig;
private final SqsClient sqsClient;
Expand Down Expand Up @@ -105,7 +109,7 @@ public void run() {
try {
messagesProcessed = processSqsMessages();
} catch (final Exception e) {
LOG.error("Unable to process SQS messages. Processing error due to: {}", e.getMessage());
LOG.error("Unable to process SQS messages", e);
// There shouldn't be any exceptions caught here, but added backoff just to control the amount of logging in case of an exception is thrown.
applyBackoff();
}
Expand Down Expand Up @@ -236,6 +240,9 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
deleteSqsMessages(waitingForAcknowledgements);
}
}, Duration.ofSeconds(timeout));

final Instant expiryTime = Instant.now().plusMillis(Duration.ofSeconds(timeout).toMillis());
acknowledgementSetManager.addExpiryMonitor(getExpiryItem(parsedMessage.getMessage(), expiryTime, acknowledgementSet));
}
final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey());
final Optional<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet);
Expand Down Expand Up @@ -336,6 +343,27 @@ private S3ObjectReference populateS3Reference(final String bucketName, final Str
.build();
}

private ExpiryItem getExpiryItem(final Message message, final Instant expirationTime, final AcknowledgementSet acknowledgementSet) {
final long visibilityTimeoutSeconds = sqsOptions.getVisibilityTimeout().getSeconds();
return new ExpiryItem(message.messageId(), expirationTime.minusSeconds(visibilityTimeoutSeconds), visibilityTimeoutSeconds, expirationTime,
getExpiryCallback(message), acknowledgementSet);
}

private Consumer<ExpiryItem> getExpiryCallback(final Message message) {
return expiryItem -> {
final int newVisibilityTimeoutSeconds = (int) (expiryItem.getPollSeconds());

final ChangeMessageVisibilityRequest changeMessageVisibilityRequest = ChangeMessageVisibilityRequest.builder()
.visibilityTimeout(newVisibilityTimeoutSeconds)
.queueUrl(sqsOptions.getSqsUrl())
.receiptHandle(message.receiptHandle())
.build();

LOG.info("Setting visibility timeout for message {} to {}", message.messageId(), newVisibilityTimeoutSeconds);
sqsClient.changeMessageVisibility(changeMessageVisibilityRequest);
};
}

void stop() {
isStopped = true;
Thread.currentThread().interrupt();
Expand Down

0 comments on commit bb37905

Please sign in to comment.