From 1cf0d55285185bcf84f248619c042b2f979e43a2 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Sat, 11 Nov 2023 22:24:16 -0600 Subject: [PATCH] Refactor to use Admin client instead of second set of consumers for empty check Signed-off-by: Chase Engelbrecht --- .../kafka/admin/KafkaAdminAccessor.java | 103 ++++++ .../plugins/kafka/buffer/KafkaBuffer.java | 11 +- .../kafka/consumer/KafkaCustomConsumer.java | 42 --- .../consumer/KafkaCustomConsumerFactory.java | 3 +- .../plugins/kafka/source/KafkaSource.java | 4 +- .../kafka/admin/KafkaAdminAccessorTest.java | 302 ++++++++++++++++++ .../plugins/kafka/buffer/KafkaBufferTest.java | 64 +--- .../consumer/KafkaCustomConsumerTest.java | 187 +---------- 8 files changed, 426 insertions(+), 290 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/admin/KafkaAdminAccessor.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/admin/KafkaAdminAccessorTest.java diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/admin/KafkaAdminAccessor.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/admin/KafkaAdminAccessor.java new file mode 100644 index 0000000000..14f27df110 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/admin/KafkaAdminAccessor.java @@ -0,0 +1,103 @@ +package org.opensearch.dataprepper.plugins.kafka.admin; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.opensearch.dataprepper.plugins.kafka.consumer.TopicEmptinessMetadata; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaClusterAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +public class KafkaAdminAccessor { + static final Logger LOG = LoggerFactory.getLogger(KafkaAdminAccessor.class); + + private final AdminClient kafkaAdminClient; + private final TopicEmptinessMetadata topicEmptinessMetadata; + private final List consumerGroupIds; + + public KafkaAdminAccessor(final KafkaClusterAuthConfig kafkaClusterAuthConfig, final List consumerGroupIds) { + Properties authProperties = new Properties(); + KafkaSecurityConfigurer.setAuthProperties(authProperties, kafkaClusterAuthConfig, LOG); + this.kafkaAdminClient = KafkaAdminClient.create(authProperties); + this.topicEmptinessMetadata = new TopicEmptinessMetadata(); + this.consumerGroupIds = consumerGroupIds; + } + + @VisibleForTesting + KafkaAdminAccessor(final AdminClient kafkaAdminClient, final TopicEmptinessMetadata topicEmptinessMetadata, final List consumerGroupIds) { + this.kafkaAdminClient = kafkaAdminClient; + this.topicEmptinessMetadata = topicEmptinessMetadata; + this.consumerGroupIds = consumerGroupIds; + } + + public synchronized boolean areTopicsEmpty() { + final long currentThreadId = Thread.currentThread().getId(); + if (Objects.isNull(topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId())) { + topicEmptinessMetadata.setTopicEmptyCheckingOwnerThreadId(currentThreadId); + } + + if (currentThreadId != topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId() || + topicEmptinessMetadata.isWithinCheckInterval(System.currentTimeMillis())) { + return topicEmptinessMetadata.isTopicEmpty(); + } + + + final Map committedOffsets = new HashMap<>(); + for (String consumerGroupId: consumerGroupIds) { + final ListConsumerGroupOffsetsResult listConsumerGroupOffsets = kafkaAdminClient.listConsumerGroupOffsets(consumerGroupId); + try { + committedOffsets.putAll(listConsumerGroupOffsets.partitionsToOffsetAndMetadata().get()); + } catch (final InterruptedException | ExecutionException e) { + LOG.error("Caught exception getting committed offset data", e); + return false; + } + } + + final Map listOffsetsRequest = committedOffsets.keySet().stream() + .collect(Collectors.toMap(topicPartition -> topicPartition, topicPartition -> OffsetSpec.latest())); + final Map endOffsets; + try { + endOffsets = kafkaAdminClient.listOffsets(listOffsetsRequest).all().get(); + } catch (final InterruptedException | ExecutionException e) { + LOG.error("Caught exception getting end offset data", e); + return false; + } + + for (TopicPartition topicPartition : committedOffsets.keySet()) { + final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition); + + if (!endOffsets.containsKey(topicPartition)) { + LOG.warn("No end offset found for topic partition: {}", topicPartition); + return false; + } + final long endOffset = endOffsets.get(topicPartition).offset(); + + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true); + + // If there is data in the partition + if (endOffset != 0L) { + // If there is no committed offset for the partition or the committed offset is behind the end offset + if (Objects.isNull(offsetAndMetadata) || offsetAndMetadata.offset() < endOffset) { + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false); + } + } + } + + topicEmptinessMetadata.setLastIsEmptyCheckTime(System.currentTimeMillis()); + return topicEmptinessMetadata.isTopicEmpty(); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index fd8f7365da..417612274b 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -20,6 +20,7 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory; @@ -48,7 +49,7 @@ public class KafkaBuffer extends AbstractBuffer> { static final String WRITE = "Write"; static final String READ = "Read"; private final KafkaCustomProducer producer; - private final List emptyCheckingConsumers; + private final KafkaAdminAccessor kafkaAdminAccessor; private final AbstractBuffer> innerBuffer; private final ExecutorService executorService; private final Duration drainTimeout; @@ -73,8 +74,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName()); final List consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker); - emptyCheckingConsumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), - innerBuffer, pluginMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, null); + this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId())); this.executorService = Executors.newFixedThreadPool(consumers.size()); consumers.forEach(this.executorService::submit); @@ -130,10 +130,7 @@ public void doCheckpoint(CheckpointState checkpointState) { @Override public boolean isEmpty() { - final boolean areTopicsEmpty = emptyCheckingConsumers.stream() - .allMatch(KafkaCustomConsumer::isTopicEmpty); - - return areTopicsEmpty && innerBuffer.isEmpty(); + return kafkaAdminAccessor.areTopicsEmpty() && innerBuffer.isEmpty(); } @Override diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index d9ab131a68..5c4035d6c1 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -16,7 +16,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.RecordDeserializationException; @@ -56,7 +55,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCause; @@ -95,7 +93,6 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener private long numRecordsCommitted = 0; private final LogRateLimiter errLogRateLimiter; private final ByteDecoder byteDecoder; - private final TopicEmptinessMetadata topicEmptinessMetadata; public KafkaCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -106,7 +103,6 @@ public KafkaCustomConsumer(final KafkaConsumer consumer, final AcknowledgementSetManager acknowledgementSetManager, final ByteDecoder byteDecoder, final KafkaTopicConsumerMetrics topicMetrics, - final TopicEmptinessMetadata topicEmptinessMetadata, final PauseConsumePredicate pauseConsumePredicate) { this.topicName = topicConfig.getName(); this.topicConfig = topicConfig; @@ -132,7 +128,6 @@ public KafkaCustomConsumer(final KafkaConsumer consumer, this.lastCommitTime = System.currentTimeMillis(); this.numberOfAcksPending = new AtomicInteger(0); this.errLogRateLimiter = new LogRateLimiter(2, System.currentTimeMillis()); - this.topicEmptinessMetadata = topicEmptinessMetadata; } KafkaTopicConsumerMetrics getTopicMetrics() { @@ -537,41 +532,4 @@ final String getTopicPartitionOffset(final Map offsetMap, final Long offset = offsetMap.get(topicPartition); return Objects.isNull(offset) ? "-" : offset.toString(); } - - public synchronized boolean isTopicEmpty() { - final long currentThreadId = Thread.currentThread().getId(); - if (Objects.isNull(topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId())) { - topicEmptinessMetadata.setTopicEmptyCheckingOwnerThreadId(currentThreadId); - } - - if (currentThreadId != topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId() || - topicEmptinessMetadata.isWithinCheckInterval(System.currentTimeMillis())) { - return topicEmptinessMetadata.isTopicEmpty(); - } - - final List partitions = consumer.partitionsFor(topicName); - final List topicPartitions = partitions.stream() - .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition())) - .collect(Collectors.toList()); - - final Map committedOffsets = consumer.committed(new HashSet<>(topicPartitions)); - final Map endOffsets = consumer.endOffsets(topicPartitions); - - for (TopicPartition topicPartition : topicPartitions) { - final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition); - final Long endOffset = endOffsets.get(topicPartition); - topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true); - - // If there is data in the partition - if (endOffset != 0L) { - // If there is no committed offset for the partition or the committed offset is behind the end offset - if (Objects.isNull(offsetAndMetadata) || offsetAndMetadata.offset() < endOffset) { - topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false); - } - } - } - - topicEmptinessMetadata.setLastIsEmptyCheckTime(System.currentTimeMillis()); - return topicEmptinessMetadata.isTopicEmpty(); - } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index a50702ff3a..9e81d6ff30 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -91,7 +91,6 @@ public List createConsumersForTopic(final KafkaConsumerConf try { final int numWorkers = topic.getWorkers(); - final TopicEmptinessMetadata topicEmptinessMetadata = new TopicEmptinessMetadata(); IntStream.range(0, numWorkers).forEach(index -> { KafkaDataConfig dataConfig = new KafkaDataConfigAdapter(keyFactory, topic); Deserializer keyDeserializer = (Deserializer) serializationFactory.getDeserializer(PlaintextKafkaDataConfig.plaintextDataConfig(dataConfig)); @@ -105,7 +104,7 @@ public List createConsumersForTopic(final KafkaConsumerConf final KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties, keyDeserializer, valueDeserializer); consumers.add(new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, kafkaConsumerConfig, topic, - schemaType, acknowledgementSetManager, byteDecoder, topicMetrics, topicEmptinessMetadata, pauseConsumePredicate)); + schemaType, acknowledgementSetManager, byteDecoder, topicMetrics, pauseConsumePredicate)); }); } catch (Exception e) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 49fec5646a..ec27f1f370 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -38,7 +38,6 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer; import org.opensearch.dataprepper.plugins.kafka.consumer.PauseConsumePredicate; -import org.opensearch.dataprepper.plugins.kafka.consumer.TopicEmptinessMetadata; import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier; import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; @@ -118,7 +117,6 @@ public void start(Buffer> buffer) { int numWorkers = topic.getWorkers(); executorService = Executors.newFixedThreadPool(numWorkers); allTopicExecutorServices.add(executorService); - final TopicEmptinessMetadata topicEmptinessMetadata = new TopicEmptinessMetadata(); IntStream.range(0, numWorkers).forEach(index -> { while (true) { @@ -142,7 +140,7 @@ public void start(Buffer> buffer) { } consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, - acknowledgementSetManager, null, topicMetrics, topicEmptinessMetadata, PauseConsumePredicate.noPause()); + acknowledgementSetManager, null, topicMetrics, PauseConsumePredicate.noPause()); allTopicConsumers.add(consumer); executorService.submit(consumer); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/admin/KafkaAdminAccessorTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/admin/KafkaAdminAccessorTest.java new file mode 100644 index 0000000000..752bc91a32 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/admin/KafkaAdminAccessorTest.java @@ -0,0 +1,302 @@ +package org.opensearch.dataprepper.plugins.kafka.admin; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.dataprepper.plugins.kafka.consumer.TopicEmptinessMetadata; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class KafkaAdminAccessorTest { + private static final String CONSUMER_GROUP_ID = UUID.randomUUID().toString(); + private static final String TOPIC_NAME = UUID.randomUUID().toString(); + private static final Random RANDOM = new Random(); + + @Mock + private AdminClient kafkaAdminClient; + @Mock + private OffsetAndMetadata offsetAndMetadata; + @Mock + private ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult; + @Mock + private ListOffsetsResult listOffsetsResult; + @Mock + private KafkaFuture> committedOffsetsFuture; + @Mock + private KafkaFuture> endOffsetsFuture; + @Mock + private ListOffsetsResult.ListOffsetsResultInfo listOffsetsResultInfo; + private TopicEmptinessMetadata topicEmptinessMetadata; + + private KafkaAdminAccessor kafkaAdminAccessor; + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + } + + public KafkaAdminAccessor createObjectUnderTest() { + topicEmptinessMetadata = new TopicEmptinessMetadata(); + return new KafkaAdminAccessor(kafkaAdminClient, topicEmptinessMetadata, List.of(CONSUMER_GROUP_ID)); + } + + @Test + public void areTopicsEmpty_OnePartition_IsEmpty() throws ExecutionException, InterruptedException { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(true)); + + verifyAdminClientCalls(topicPartitions); + } + + @Test + public void areTopicsEmpty_OnePartition_PartitionNeverHadData() throws ExecutionException, InterruptedException { + final Long offset = 0L; + final List topicPartitions = buildTopicPartitions(1); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset); + when(offsetAndMetadata.offset()).thenReturn(offset - 1); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(true)); + + verifyAdminClientCalls(topicPartitions); + } + + @Test + public void areTopicsEmpty_OnePartition_IsNotEmpty() throws ExecutionException, InterruptedException { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset); + when(offsetAndMetadata.offset()).thenReturn(offset - 1); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(false)); + + verifyAdminClientCalls(topicPartitions); + } + + @Test + public void areTopicsEmpty_OnePartition_NoCommittedPartition() throws ExecutionException, InterruptedException { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset); + final HashMap committedOffsets = new HashMap<>(); + committedOffsets.put(topicPartitions.get(0), null); + when(committedOffsetsFuture.get()).thenReturn(committedOffsets); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(false)); + + verifyAdminClientCalls(topicPartitions); + } + + @Test + public void areTopicsEmpty_MultiplePartitions_AllEmpty() throws ExecutionException, InterruptedException { + final Long offset1 = RANDOM.nextLong(); + final Long offset2 = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(2); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset1); + when(listOffsetsResultInfo.offset()).thenReturn(offset1).thenReturn(offset2); + when(offsetAndMetadata.offset()).thenReturn(offset1).thenReturn(offset2); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(true)); + + verifyAdminClientCalls(topicPartitions); + } + + @Test + public void areTopicsEmpty_MultiplePartitions_OneNotEmpty() throws ExecutionException, InterruptedException { + final Long offset1 = RANDOM.nextLong(); + final Long offset2 = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(2); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset1); + when(listOffsetsResultInfo.offset()).thenReturn(offset1).thenReturn(offset2); + when(offsetAndMetadata.offset()).thenReturn(offset1).thenReturn(offset2 - 1); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(false)); + + verifyAdminClientCalls(topicPartitions); + } + + @Test + public void areTopicsEmpty_NonCheckerThread_ShortCircuits() { + kafkaAdminAccessor = createObjectUnderTest(); + + topicEmptinessMetadata.setTopicEmptyCheckingOwnerThreadId(Thread.currentThread().getId() - 1); + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(true)); + + verifyNoInteractions(kafkaAdminClient); + } + + @Test + public void areTopicsEmpty_CheckedWithinDelay_ShortCircuits() { + kafkaAdminAccessor = createObjectUnderTest(); + + topicEmptinessMetadata.setLastIsEmptyCheckTime(System.currentTimeMillis()); + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(true)); + + verifyNoInteractions(kafkaAdminClient); + } + + @Test + public void areTopicsEmpty_ExceptionGettingCommittedOffsets_ReturnsFalse() throws ExecutionException, InterruptedException { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset); + when(committedOffsetsFuture.get()).thenThrow(new InterruptedException()); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(false)); + + verify(kafkaAdminClient).listConsumerGroupOffsets(CONSUMER_GROUP_ID); + verifyNoMoreInteractions(kafkaAdminClient); + } + + @Test + public void areTopicsEmpty_ExceptionGettingEndOffsets_ReturnsFalse() throws ExecutionException, InterruptedException { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset); + when(endOffsetsFuture.get()).thenThrow(new InterruptedException()); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(false)); + + verifyAdminClientCalls(topicPartitions); + } + + @Test + public void areTopicsEmpty_MissingEndOffset_ReturnsFalse() throws ExecutionException, InterruptedException { + final Long offset = RANDOM.nextLong(); + final List topicPartitions = buildTopicPartitions(1); + + kafkaAdminAccessor = createObjectUnderTest(); + mockAdminClientCalls(topicPartitions, offset); + when(endOffsetsFuture.get()).thenReturn(Collections.emptyMap()); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(false)); + + verifyAdminClientCalls(topicPartitions); + } + + @Test + public void areTopicsEmpty_MultipleConsumerGroups() throws ExecutionException, InterruptedException { + final Long offset = RANDOM.nextLong(); + final List topicPartitions1 = List.of(new TopicPartition(TOPIC_NAME, 0)); + final List topicPartitions2 = List.of(new TopicPartition(UUID.randomUUID().toString(), 0)); + + topicEmptinessMetadata = new TopicEmptinessMetadata(); + final String consumerGroupId2 = UUID.randomUUID().toString(); + kafkaAdminAccessor = new KafkaAdminAccessor(kafkaAdminClient, topicEmptinessMetadata, + List.of(CONSUMER_GROUP_ID, consumerGroupId2)); + + mockAdminClientCalls(topicPartitions1, offset); + when(kafkaAdminClient.listConsumerGroupOffsets(CONSUMER_GROUP_ID)).thenReturn(listConsumerGroupOffsetsResult); + when(kafkaAdminClient.listConsumerGroupOffsets(consumerGroupId2)).thenReturn(listConsumerGroupOffsetsResult); + when(committedOffsetsFuture.get()) + .thenReturn(getTopicPartitionToMap(topicPartitions1, offsetAndMetadata)) + .thenReturn(getTopicPartitionToMap(topicPartitions2, offsetAndMetadata)); + final Map endOffsets = getTopicPartitionToMap(topicPartitions1, listOffsetsResultInfo); + endOffsets.putAll(getTopicPartitionToMap(topicPartitions2, listOffsetsResultInfo)); + when(endOffsetsFuture.get()).thenReturn(endOffsets); + + assertThat(kafkaAdminAccessor.areTopicsEmpty(), equalTo(true)); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + verify(kafkaAdminClient, times(2)).listConsumerGroupOffsets(argumentCaptor.capture()); + assertThat(argumentCaptor.getAllValues().size(), equalTo(2)); + assertThat(argumentCaptor.getAllValues().get(0), equalTo(CONSUMER_GROUP_ID)); + assertThat(argumentCaptor.getAllValues().get(1), equalTo(consumerGroupId2)); + + verify(kafkaAdminClient).listOffsets(argThat(r -> { + final List combinedTopicPartitions = Stream.concat(topicPartitions1.stream(), topicPartitions2.stream()) + .collect(Collectors.toList()); + assertAll("ListOffsets request fields match", + () -> assertThat("Request map size matches", r.size(), equalTo(combinedTopicPartitions.size())), + () -> assertThat("TopicPartitions in keyset", + combinedTopicPartitions.stream().allMatch(topicPartition -> r.containsKey(topicPartition))), + () -> assertThat("OffsetSpec matches", + combinedTopicPartitions.stream().allMatch(topicPartition -> r.get(topicPartition) instanceof OffsetSpec.LatestSpec)) + ); + return true; + })); + } + + private void mockAdminClientCalls(final List topicPartitions, final long offset) throws ExecutionException, InterruptedException { + when(kafkaAdminClient.listConsumerGroupOffsets(CONSUMER_GROUP_ID)).thenReturn(listConsumerGroupOffsetsResult); + when(listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata()).thenReturn(committedOffsetsFuture); + when(committedOffsetsFuture.get()).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); + when(kafkaAdminClient.listOffsets(any())).thenReturn(listOffsetsResult); + when(listOffsetsResult.all()).thenReturn(endOffsetsFuture); + when(endOffsetsFuture.get()).thenReturn(getTopicPartitionToMap(topicPartitions, listOffsetsResultInfo)); + when(listOffsetsResultInfo.offset()).thenReturn(offset); + when(offsetAndMetadata.offset()).thenReturn(offset); + } + + private void verifyAdminClientCalls(final List topicPartitions) { + verify(kafkaAdminClient).listConsumerGroupOffsets(CONSUMER_GROUP_ID); + verify(kafkaAdminClient).listOffsets(argThat(r -> { + assertAll("ListOffsets request fields match", + () -> assertThat("Request map size matches", r.size(), equalTo(topicPartitions.size())), + () -> assertThat("TopicPartitions in keyset", + topicPartitions.stream().allMatch(topicPartition -> r.containsKey(topicPartition))), + () -> assertThat("OffsetSpec matches", + topicPartitions.stream().allMatch(topicPartition -> r.get(topicPartition) instanceof OffsetSpec.LatestSpec)) + ); + return true; + })); + } + + private List buildTopicPartitions(final int partitionCount) { + return IntStream.range(0, partitionCount) + .mapToObj(i -> new TopicPartition(TOPIC_NAME, i)) + .collect(Collectors.toList()); + } + + private Map getTopicPartitionToMap(final List topicPartitions, final T value) { + return topicPartitions.stream() + .collect(Collectors.toMap(i -> i, i -> value)); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 9a984115d1..1d31bcf2c2 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -25,6 +25,7 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; @@ -63,7 +64,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -119,6 +119,9 @@ class KafkaBufferTest { @Mock private KafkaCustomConsumer consumer; + @Mock + private KafkaAdminAccessor kafkaAdminAccessor; + @Mock BlockingBuffer> blockingBuffer; @@ -145,6 +148,8 @@ public KafkaBuffer createObjectUnderTest(final List consume consumerFactory = mock; when(consumerFactory.createConsumersForTopic(any(), any(), any(), any(), any(), any(), any(), anyBoolean(), any())).thenReturn(consumers); }); + final MockedConstruction adminAccessorMock = + mockConstruction(KafkaAdminAccessor.class, (mock, context) -> kafkaAdminAccessor = mock); final MockedConstruction blockingBufferMock = mockConstruction(BlockingBuffer.class, (mock, context) -> { blockingBuffer = mock; @@ -237,13 +242,13 @@ void test_kafkaBuffer_isEmpty_True() { kafkaBuffer = createObjectUnderTest(); assertTrue(Objects.nonNull(kafkaBuffer)); when(blockingBuffer.isEmpty()).thenReturn(true); - when(consumer.isTopicEmpty()).thenReturn(true); + when(kafkaAdminAccessor.areTopicsEmpty()).thenReturn(true); final boolean result = kafkaBuffer.isEmpty(); assertThat(result, equalTo(true)); verify(blockingBuffer).isEmpty(); - verify(consumer).isTopicEmpty(); + verify(kafkaAdminAccessor).areTopicsEmpty(); } @Test @@ -251,13 +256,13 @@ void test_kafkaBuffer_isEmpty_BufferNotEmpty() { kafkaBuffer = createObjectUnderTest(); assertTrue(Objects.nonNull(kafkaBuffer)); when(blockingBuffer.isEmpty()).thenReturn(false); - when(consumer.isTopicEmpty()).thenReturn(true); + when(kafkaAdminAccessor.areTopicsEmpty()).thenReturn(true); final boolean result = kafkaBuffer.isEmpty(); assertThat(result, equalTo(false)); verify(blockingBuffer).isEmpty(); - verify(consumer).isTopicEmpty(); + verify(kafkaAdminAccessor).areTopicsEmpty(); } @Test @@ -265,13 +270,13 @@ void test_kafkaBuffer_isEmpty_TopicNotEmpty() { kafkaBuffer = createObjectUnderTest(); assertTrue(Objects.nonNull(kafkaBuffer)); when(blockingBuffer.isEmpty()).thenReturn(true); - when(consumer.isTopicEmpty()).thenReturn(false); + when(kafkaAdminAccessor.areTopicsEmpty()).thenReturn(false); final boolean result = kafkaBuffer.isEmpty(); assertThat(result, equalTo(false)); verifyNoInteractions(blockingBuffer); - verify(consumer).isTopicEmpty(); + verify(kafkaAdminAccessor).areTopicsEmpty(); } @Test @@ -279,54 +284,13 @@ void test_kafkaBuffer_isEmpty_MultipleTopics_AllNotEmpty() { kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer)); assertTrue(Objects.nonNull(kafkaBuffer)); when(blockingBuffer.isEmpty()).thenReturn(true); - when(consumer.isTopicEmpty()).thenReturn(false).thenReturn(false); + when(kafkaAdminAccessor.areTopicsEmpty()).thenReturn(false).thenReturn(false); final boolean result = kafkaBuffer.isEmpty(); assertThat(result, equalTo(false)); verifyNoInteractions(blockingBuffer); - verify(consumer).isTopicEmpty(); - } - - @Test - void test_kafkaBuffer_isEmpty_MultipleTopics_SomeNotEmpty() { - kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer)); - assertTrue(Objects.nonNull(kafkaBuffer)); - when(blockingBuffer.isEmpty()).thenReturn(true); - when(consumer.isTopicEmpty()).thenReturn(true).thenReturn(false); - - final boolean result = kafkaBuffer.isEmpty(); - assertThat(result, equalTo(false)); - - verifyNoInteractions(blockingBuffer); - verify(consumer, times(2)).isTopicEmpty(); - } - - @Test - void test_kafkaBuffer_isEmpty_MultipleTopics_AllEmpty() { - kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer)); - assertTrue(Objects.nonNull(kafkaBuffer)); - when(blockingBuffer.isEmpty()).thenReturn(true); - when(consumer.isTopicEmpty()).thenReturn(true).thenReturn(true); - - final boolean result = kafkaBuffer.isEmpty(); - assertThat(result, equalTo(true)); - - verify(blockingBuffer).isEmpty(); - verify(consumer, times(2)).isTopicEmpty(); - } - - @Test - void test_kafkaBuffer_isEmpty_ZeroTopics() { - kafkaBuffer = createObjectUnderTest(Collections.emptyList()); - assertTrue(Objects.nonNull(kafkaBuffer)); - when(blockingBuffer.isEmpty()).thenReturn(true); - - final boolean result = kafkaBuffer.isEmpty(); - assertThat(result, equalTo(true)); - - verify(blockingBuffer).isEmpty(); - verifyNoInteractions(consumer); + verify(kafkaAdminAccessor).areTopicsEmpty(); } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index 7943c07419..7d3a0f3fb9 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -14,7 +14,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.junit.jupiter.api.Assertions; @@ -43,38 +42,26 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; -import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyCollection; -import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class KafkaCustomConsumerTest { private static final String TOPIC_NAME = "topic1"; - private static final Random RANDOM = new Random(); @Mock private KafkaConsumer kafkaConsumer; @@ -94,10 +81,6 @@ public class KafkaCustomConsumerTest { @Mock private KafkaTopicConsumerMetrics topicMetrics; - @Mock - private PartitionInfo partitionInfo; - @Mock - private OffsetAndMetadata offsetAndMetadata; @Mock private PauseConsumePredicate pauseConsumePredicate; @@ -126,7 +109,6 @@ public class KafkaCustomConsumerTest { private Duration delayTime; private double posCount; private double negCount; - private TopicEmptinessMetadata topicEmptinessMetadata; @BeforeEach public void setUp() { @@ -166,10 +148,9 @@ public void setUp() { } public KafkaCustomConsumer createObjectUnderTest(String schemaType, boolean acknowledgementsEnabled) { - topicEmptinessMetadata = new TopicEmptinessMetadata(); when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(acknowledgementsEnabled); return new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topicConfig, schemaType, - acknowledgementSetManager, null, topicMetrics, topicEmptinessMetadata, pauseConsumePredicate); + acknowledgementSetManager, null, topicMetrics, pauseConsumePredicate); } private BlockingBuffer> getBuffer() { @@ -478,172 +459,6 @@ public void testAwsGlueErrorWithAcknowledgements() throws Exception { }); } - @Test - public void isTopicEmpty_OnePartition_IsEmpty() { - final Long offset = RANDOM.nextLong(); - final List topicPartitions = buildTopicPartitions(1); - - consumer = createObjectUnderTest("json", true); - - when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); - when(partitionInfo.partition()).thenReturn(0); - when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); - when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(getTopicPartitionToMap(topicPartitions, offset)); - when(offsetAndMetadata.offset()).thenReturn(offset); - - assertThat(consumer.isTopicEmpty(), equalTo(true)); - - verify(kafkaConsumer).partitionsFor(TOPIC_NAME); - verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); - verify(kafkaConsumer).endOffsets(topicPartitions); - verify(partitionInfo).partition(); - verify(offsetAndMetadata).offset(); - } - - @Test - public void isTopicEmpty_OnePartition_PartitionNeverHadData() { - final Long offset = 0L; - final List topicPartitions = buildTopicPartitions(1); - - consumer = createObjectUnderTest("json", true); - - when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); - when(partitionInfo.partition()).thenReturn(0); - when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); - when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(getTopicPartitionToMap(topicPartitions, offset)); - when(offsetAndMetadata.offset()).thenReturn(offset - 1); - - assertThat(consumer.isTopicEmpty(), equalTo(true)); - - verify(kafkaConsumer).partitionsFor(TOPIC_NAME); - verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); - verify(kafkaConsumer).endOffsets(topicPartitions); - verify(partitionInfo).partition(); - } - - @Test - public void isTopicEmpty_OnePartition_IsNotEmpty() { - final Long offset = RANDOM.nextLong(); - final List topicPartitions = buildTopicPartitions(1); - - consumer = createObjectUnderTest("json", true); - - when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); - when(partitionInfo.partition()).thenReturn(0); - when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); - when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(getTopicPartitionToMap(topicPartitions, offset)); - when(offsetAndMetadata.offset()).thenReturn(offset - 1); - - assertThat(consumer.isTopicEmpty(), equalTo(false)); - - verify(kafkaConsumer).partitionsFor(TOPIC_NAME); - verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); - verify(kafkaConsumer).endOffsets(topicPartitions); - verify(partitionInfo).partition(); - verify(offsetAndMetadata).offset(); - } - - @Test - public void isTopicEmpty_OnePartition_NoCommittedPartition() { - final Long offset = RANDOM.nextLong(); - final List topicPartitions = buildTopicPartitions(1); - - consumer = createObjectUnderTest("json", true); - - when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); - when(partitionInfo.partition()).thenReturn(0); - when(kafkaConsumer.committed(anySet())).thenReturn(Collections.emptyMap()); - when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(getTopicPartitionToMap(topicPartitions, offset)); - - assertThat(consumer.isTopicEmpty(), equalTo(false)); - - verify(kafkaConsumer).partitionsFor(TOPIC_NAME); - verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); - verify(kafkaConsumer).endOffsets(topicPartitions); - verify(partitionInfo).partition(); - } - - @Test - public void isTopicEmpty_MultiplePartitions_AllEmpty() { - final Long offset1 = RANDOM.nextLong(); - final Long offset2 = RANDOM.nextLong(); - final List topicPartitions = buildTopicPartitions(2); - - consumer = createObjectUnderTest("json", true); - - when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo, partitionInfo)); - when(partitionInfo.partition()).thenReturn(0).thenReturn(1); - when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); - final Map endOffsets = getTopicPartitionToMap(topicPartitions, offset1); - endOffsets.put(topicPartitions.get(1), offset2); - when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(endOffsets); - when(offsetAndMetadata.offset()).thenReturn(offset1).thenReturn(offset2); - - assertThat(consumer.isTopicEmpty(), equalTo(true)); - - verify(kafkaConsumer).partitionsFor(TOPIC_NAME); - verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); - verify(kafkaConsumer).endOffsets(topicPartitions); - verify(partitionInfo, times(2)).partition(); - verify(offsetAndMetadata, times(2)).offset(); - } - - @Test - public void isTopicEmpty_MultiplePartitions_OneNotEmpty() { - final Long offset1 = RANDOM.nextLong(); - final Long offset2 = RANDOM.nextLong(); - final List topicPartitions = buildTopicPartitions(2); - - consumer = createObjectUnderTest("json", true); - - when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo, partitionInfo)); - when(partitionInfo.partition()).thenReturn(0).thenReturn(1); - when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); - final Map endOffsets = getTopicPartitionToMap(topicPartitions, offset1); - endOffsets.put(topicPartitions.get(1), offset2); - when(kafkaConsumer.endOffsets(anyCollection())).thenReturn(endOffsets); - when(offsetAndMetadata.offset()).thenReturn(offset1).thenReturn(offset2 - 1); - - assertThat(consumer.isTopicEmpty(), equalTo(false)); - - verify(kafkaConsumer).partitionsFor(TOPIC_NAME); - verify(kafkaConsumer).committed(new HashSet<>(topicPartitions)); - verify(kafkaConsumer).endOffsets(topicPartitions); - verify(partitionInfo, times(2)).partition(); - verify(offsetAndMetadata, times(2)).offset(); - } - - @Test - public void isTopicEmpty_NonCheckerThread_ShortCircuits() { - consumer = createObjectUnderTest("json", true); - - topicEmptinessMetadata.setTopicEmptyCheckingOwnerThreadId(Thread.currentThread().getId() - 1); - assertThat(consumer.isTopicEmpty(), equalTo(true)); - - verifyNoInteractions(kafkaConsumer); - } - - @Test - public void isTopicEmpty_CheckedWithinDelay_ShortCircuits() { - consumer = createObjectUnderTest("json", true); - - topicEmptinessMetadata.setLastIsEmptyCheckTime(System.currentTimeMillis()); - assertThat(consumer.isTopicEmpty(), equalTo(true)); - - verifyNoInteractions(kafkaConsumer); - } - - private List buildTopicPartitions(final int partitionCount) { - return IntStream.range(0, partitionCount) - .mapToObj(i -> new TopicPartition(TOPIC_NAME, i)) - .collect(Collectors.toList()); - } - - private Map getTopicPartitionToMap(final List topicPartitions, final T value) { - return topicPartitions.stream() - .collect(Collectors.toMap(i -> i, i -> value)); - } - private ConsumerRecords createPlainTextRecords(String topic, final long startOffset) { Map> records = new HashMap<>(); ConsumerRecord record1 = new ConsumerRecord<>(topic, testPartition, startOffset, testKey1, testValue1);