From bb37905bd2a80be418c549b5eb3c86a93feb4192 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Sun, 29 Oct 2023 21:13:37 -0500 Subject: [PATCH] Add expiry monitor to adjust SQS viz timeout if processing has not yet completed Signed-off-by: Chase Engelbrecht --- .../acknowledgements/AcknowledgementSet.java | 8 +++ .../AcknowledgementSetManager.java | 3 + .../model/acknowledgements/ExpiryItem.java | 64 +++++++++++++++++++ .../DefaultAcknowledgementSet.java | 12 +++- .../DefaultAcknowledgementSetManager.java | 12 ++++ .../acknowledgements/ExpiryMonitor.java | 48 ++++++++++++++ .../InactiveAcknowledgementSetManager.java | 6 ++ .../plugins/source/s3/SqsWorker.java | 30 ++++++++- 8 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/ExpiryItem.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/ExpiryMonitor.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java index c95c2e5f88..0ecae32f8b 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSet.java @@ -8,6 +8,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import java.time.Instant; + /** * AcknowledgmentSet keeps track of set of events that * belong to the batch of events that a source creates. @@ -58,4 +60,10 @@ public interface AcknowledgementSet { * initial events are going through the pipeline line. */ public void complete(); + + Instant getExpiryTime(); + + void setExpiryTime(final Instant expiryTime); + + boolean isDone(); } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java index 69c07c4aa5..bb7e9d433f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.event.EventHandle; import java.time.Duration; +import java.time.Instant; import java.util.function.Consumer; /** @@ -30,6 +31,8 @@ public interface AcknowledgementSetManager { */ AcknowledgementSet create(final Consumer callback, final Duration timeout); + void addExpiryMonitor(final ExpiryItem expiryItem); + /** * Releases an event's reference * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/ExpiryItem.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/ExpiryItem.java new file mode 100644 index 0000000000..c60fcdaf2f --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/ExpiryItem.java @@ -0,0 +1,64 @@ +package org.opensearch.dataprepper.model.acknowledgements; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.function.Consumer; + +public class ExpiryItem { + private static final Logger LOG = LoggerFactory.getLogger(ExpiryItem.class); + + private final String itemId; + private final Instant startTime; + private final long pollSeconds; + private final Consumer expiryCallback; + private Instant expirationTime; + private AcknowledgementSet acknowledgementSet; + + public ExpiryItem(final String itemId, final Instant startTime, final long pollSeconds, final Instant expirationTime, + final Consumer expiryCallback, final AcknowledgementSet acknowledgementSet) { + this.itemId = itemId; + this.startTime = startTime; + if (pollSeconds <= 2) { + throw new UnsupportedOperationException("The poll interval must be at least 3 seconds to enable expiry monitoring"); + } + this.pollSeconds = pollSeconds; + this.expirationTime = expirationTime; + this.expiryCallback = expiryCallback; + this.acknowledgementSet = acknowledgementSet; + } + + public long getPollSeconds() { + return pollSeconds; + } + + public String getItemId() { + return itemId; + } + + public boolean executeExpiryCallback() { + try { + expiryCallback.accept(this); + return true; + } catch (final Exception e) { + LOG.error("Exception occurred when executing the expiry callback", e.getMessage()); + return false; + } + } + + public boolean isCompleteOrExpired() { + return acknowledgementSet.isDone() || Instant.now().isAfter(expirationTime); + } + + public void updateExpirationTime() { + LOG.info("Updating expiration time for item ID {} to {}", itemId, expirationTime); + expirationTime = expirationTime.plusSeconds(pollSeconds); + acknowledgementSet.setExpiryTime(expirationTime); + } + + public long getSecondsBetweenStartAndExpiration() { + return Duration.between(startTime, expirationTime).getSeconds(); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java index 3c8fe12159..53a9539dda 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSet.java @@ -26,7 +26,7 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet { private static final Logger LOG = LoggerFactory.getLogger(DefaultAcknowledgementSet.class); private final Consumer callback; - private final Instant expiryTime; + private Instant expiryTime; private final ExecutorService executor; // This lock protects all the non-final members private final ReentrantLock lock; @@ -36,7 +36,8 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet { private final DefaultAcknowledgementSetMetrics metrics; private boolean completed; - public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer callback, final Duration expiryTime, final DefaultAcknowledgementSetMetrics metrics) { + public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer callback, final Duration expiryTime, + final DefaultAcknowledgementSetMetrics metrics) { this.callback = callback; this.result = true; this.executor = executor; @@ -76,6 +77,7 @@ public void acquire(final EventHandle eventHandle) { } } + @Override public boolean isDone() { lock.lock(); try { @@ -98,10 +100,16 @@ public boolean isDone() { return false; } + @Override public Instant getExpiryTime() { return expiryTime; } + @Override + public void setExpiryTime(final Instant expiryTime) { + this.expiryTime = expiryTime; + } + @Override public void complete() { lock.lock(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java index 104945960e..e8f0cc8155 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/DefaultAcknowledgementSetManager.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.acknowledgements.ExpiryItem; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -26,6 +27,7 @@ public class DefaultAcknowledgementSetManager implements AcknowledgementSetManag private final AcknowledgementSetMonitorThread acknowledgementSetMonitorThread; private PluginMetrics pluginMetrics; private DefaultAcknowledgementSetMetrics metrics; + private ExpiryMonitor expiryMonitor = null; @Inject public DefaultAcknowledgementSetManager( @@ -40,6 +42,7 @@ public DefaultAcknowledgementSetManager(final ExecutorService callbackExecutor, acknowledgementSetMonitorThread.start(); pluginMetrics = PluginMetrics.fromNames("acknowledgementSetManager", "acknowledgements"); metrics = new DefaultAcknowledgementSetMetrics(pluginMetrics); + } public AcknowledgementSet create(final Consumer callback, final Duration timeout) { @@ -49,6 +52,15 @@ public AcknowledgementSet create(final Consumer callback, final Duratio return acknowledgementSet; } + @Override + public void addExpiryMonitor(final ExpiryItem expiryItem) { + if (Objects.isNull(expiryMonitor)) { + this.expiryMonitor = new ExpiryMonitor(); + } + + expiryMonitor.addExpiryItem(expiryItem); + } + public void acquireEventReference(final Event event) { acquireEventReference(event.getEventHandle()); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/ExpiryMonitor.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/ExpiryMonitor.java new file mode 100644 index 0000000000..b9e8d8852b --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/ExpiryMonitor.java @@ -0,0 +1,48 @@ +package org.opensearch.dataprepper.acknowledgements; + +import org.opensearch.dataprepper.model.acknowledgements.ExpiryItem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class ExpiryMonitor { + private static final Logger LOG = LoggerFactory.getLogger(ExpiryMonitor.class); + + private static final long CANCEL_CHECK_INTERVAL_SECONDS = 15; + private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); + private static final ConcurrentHashMap EXPIRY_MONITORS = new ConcurrentHashMap<>(); + + public ExpiryMonitor() { + SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(() -> { + EXPIRY_MONITORS.forEach((expiryItem, future) -> { + final boolean isCompleteOrExpired = expiryItem.isCompleteOrExpired(); + LOG.error("ExpiryItem with ID {} has completion/expiry status {}", expiryItem.getItemId(), isCompleteOrExpired); + + if (isCompleteOrExpired) { + future.cancel(false); + EXPIRY_MONITORS.remove(expiryItem); + } + }); + }, 0, CANCEL_CHECK_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + + public void addExpiryItem(final ExpiryItem expiryItem) { + final ScheduledFuture expiryMonitor = SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(monitorExpiration(expiryItem), + 0, expiryItem.getPollSeconds() - 2, TimeUnit.SECONDS); + EXPIRY_MONITORS.put(expiryItem, expiryMonitor); + } + + private Runnable monitorExpiration(final ExpiryItem expiryItem) { + return () -> { + final boolean callBackSuccess = expiryItem.executeExpiryCallback(); + if (callBackSuccess) { + expiryItem.updateExpirationTime(); + } + }; + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManager.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManager.java index 2e112b4560..25c69a7cdf 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManager.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/InactiveAcknowledgementSetManager.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.acknowledgements; +import org.opensearch.dataprepper.model.acknowledgements.ExpiryItem; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -26,6 +27,11 @@ public AcknowledgementSet create(final Consumer callback, final Duratio throw new UnsupportedOperationException("create operation not supported"); } + @Override + public void addExpiryMonitor(final ExpiryItem expiryItem) { + throw new UnsupportedOperationException("add expiry monitor operation not supported"); + } + public void acquireEventReference(final Event event) { throw new UnsupportedOperationException("acquire operation not supported"); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 06a38d2393..3b8be9bcbd 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.acknowledgements.ExpiryItem; import org.opensearch.dataprepper.plugins.source.s3.configuration.NotificationSourceOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.OnErrorOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions; @@ -26,6 +27,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; @@ -42,6 +44,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; public class SqsWorker implements Runnable { @@ -52,6 +55,7 @@ public class SqsWorker implements Runnable { static final String SQS_MESSAGES_DELETE_FAILED_METRIC_NAME = "sqsMessagesDeleteFailed"; static final String SQS_MESSAGE_DELAY_METRIC_NAME = "sqsMessageDelay"; static final String ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; + static final long VISIBILITY_TIMEOUT_INCREASE_BUFFER = 15L; private final S3SourceConfig s3SourceConfig; private final SqsClient sqsClient; @@ -105,7 +109,7 @@ public void run() { try { messagesProcessed = processSqsMessages(); } catch (final Exception e) { - LOG.error("Unable to process SQS messages. Processing error due to: {}", e.getMessage()); + LOG.error("Unable to process SQS messages", e); // There shouldn't be any exceptions caught here, but added backoff just to control the amount of logging in case of an exception is thrown. applyBackoff(); } @@ -236,6 +240,9 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { deleteSqsMessages(waitingForAcknowledgements); } }, Duration.ofSeconds(timeout)); + + final Instant expiryTime = Instant.now().plusMillis(Duration.ofSeconds(timeout).toMillis()); + acknowledgementSetManager.addExpiryMonitor(getExpiryItem(parsedMessage.getMessage(), expiryTime, acknowledgementSet)); } final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey()); final Optional deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet); @@ -336,6 +343,27 @@ private S3ObjectReference populateS3Reference(final String bucketName, final Str .build(); } + private ExpiryItem getExpiryItem(final Message message, final Instant expirationTime, final AcknowledgementSet acknowledgementSet) { + final long visibilityTimeoutSeconds = sqsOptions.getVisibilityTimeout().getSeconds(); + return new ExpiryItem(message.messageId(), expirationTime.minusSeconds(visibilityTimeoutSeconds), visibilityTimeoutSeconds, expirationTime, + getExpiryCallback(message), acknowledgementSet); + } + + private Consumer getExpiryCallback(final Message message) { + return expiryItem -> { + final int newVisibilityTimeoutSeconds = (int) (expiryItem.getPollSeconds()); + + final ChangeMessageVisibilityRequest changeMessageVisibilityRequest = ChangeMessageVisibilityRequest.builder() + .visibilityTimeout(newVisibilityTimeoutSeconds) + .queueUrl(sqsOptions.getSqsUrl()) + .receiptHandle(message.receiptHandle()) + .build(); + + LOG.info("Setting visibility timeout for message {} to {}", message.messageId(), newVisibilityTimeoutSeconds); + sqsClient.changeMessageVisibility(changeMessageVisibilityRequest); + }; + } + void stop() { isStopped = true; Thread.currentThread().interrupt();