From e53ad970fecc8d0f5971c4abe482438aff4d9906 Mon Sep 17 00:00:00 2001 From: ArtemTetenkin Date: Tue, 23 Apr 2024 15:38:32 -0700 Subject: [PATCH] Revert memq consumer reset logic --- .../memq/MemqPreFetchIteratorAdapter.java | 31 ---------- .../psc/consumer/memq/PscMemqConsumer.java | 56 +------------------ 2 files changed, 1 insertion(+), 86 deletions(-) delete mode 100644 psc/src/main/java/com/pinterest/psc/consumer/memq/MemqPreFetchIteratorAdapter.java diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqPreFetchIteratorAdapter.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqPreFetchIteratorAdapter.java deleted file mode 100644 index 1b1edfc..0000000 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/MemqPreFetchIteratorAdapter.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.pinterest.psc.consumer.memq; - -import com.pinterest.psc.common.CloseableIterator; - -import java.io.IOException; -import java.util.List; - -public class MemqPreFetchIteratorAdapter implements CloseableIterator { - - private final List messages; - private int index = 0; - - public MemqPreFetchIteratorAdapter(List messages) { - this.messages = messages; - } - - @Override - public void close() throws IOException { - messages.clear(); - } - - @Override - public boolean hasNext() { - return index < messages.size(); - } - - @Override - public T next() { - return messages.get(index++); - } -} diff --git a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java index 0267561..bcaf3f4 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/memq/PscMemqConsumer.java @@ -33,14 +33,10 @@ import com.pinterest.psc.metrics.MetricName; import com.pinterest.psc.metrics.PscMetricRegistryManager; import com.pinterest.psc.metrics.PscMetrics; - -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.logging.Log; -import software.amazon.awssdk.core.exception.SdkClientException; import java.io.IOException; import java.time.Duration; @@ -277,11 +273,7 @@ public PscConsumerPollMessageIterator poll(Duration pollTimeout) throws Co CloseableIterator> memqLogMessageIterator; try { MutableInt count = new MutableInt(); - if (pscConfigurationInternal.isAutoResolutionEnabled()) { - memqLogMessageIterator = preFetchPollResultIntoMemoryWithAutoResolution(memqConsumer.poll(pollTimeout, count)); - } else { - memqLogMessageIterator = new MemqIteratorAdapter<>(memqConsumer.poll(pollTimeout, count)); - } + memqLogMessageIterator = new MemqIteratorAdapter<>(memqConsumer.poll(pollTimeout, count)); } catch (NoTopicsSubscribedException e) { throw new ConsumerException("[Memq] Consumer is not subscribed to any topic.", e); } catch (IOException e) { @@ -301,52 +293,6 @@ memqLogMessageIterator, backendTopicToTopicUri, getConsumerInterceptors(), initialSeekOffsets); } - private MemqPreFetchIteratorAdapter> preFetchPollResultIntoMemoryWithAutoResolution(com.pinterest.memq.commons.CloseableIterator> it) { - List> preFetched = new ArrayList<>(); - int count = 0; - while (it.hasNext()) { - try { - preFetched.add(it.next()); - count++; - } catch (SdkClientException e) { - logger.warn("Error while pre-fetching messages, " + - "resetting backend MemQ consumer and returning " + count + " messages in iterator for now", e); - try { - it.close(); - resetBackendClient(); - break; - } catch (ConsumerException | IOException ex) { - throw new RuntimeException("Failed to reset backend Memq consumer", ex); - } - } - } - return new MemqPreFetchIteratorAdapter<>(preFetched); - - } - - protected void resetBackendClient() throws ConsumerException { - super.resetBackendClient(); - executeBackendCallWithRetries(() -> { - try { - memqConsumer.close(); - } catch (IOException e) { - throw new RuntimeException("Failed to close Memq consumer instance.", e); - } - }); - - try { - memqConsumer = new MemqConsumer<>(properties); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate a Memq consumer instance.", e); - } - - if (!currentAssignment.isEmpty()) - assign(currentAssignment); - else if (!currentSubscription.isEmpty()) - subscribe(currentSubscription); - - } - private void handleMemqConsumerMetrics(MetricRegistry metricRegistry) { }