diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 8aea91e50f..38daf566f5 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -70,9 +70,9 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L; private static final long IS_EMPTY_CHECK_INTERVAL_MS = 60000L; private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1; - private static final ConcurrentHashMap TOPIC_PARTITION_TO_IS_EMPTY = new ConcurrentHashMap<>(); - private static Long topicEmptyCheckingOwnerThreadId = null; - private static long lastIsEmptyCheckTime = 0; + static final ConcurrentHashMap TOPIC_PARTITION_TO_IS_EMPTY = new ConcurrentHashMap<>(); + static Long topicEmptyCheckingOwnerThreadId = null; + static long lastIsEmptyCheckTime = 0; static final String DEFAULT_KEY = "message"; private volatile long lastCommitTime; @@ -557,8 +557,6 @@ public synchronized boolean isTopicEmpty() { for (TopicPartition topicPartition : topicPartitions) { final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition); final Long endOffset = endOffsets.get(topicPartition); - LOG.info("Partition {} offsets: endOffset: {}, committedOffset: {}", - topicPartition, endOffset, Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset()); TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, true); // If there is data in the partition diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index 8f2a4918f5..d36efb188b 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -68,6 +68,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -479,6 +480,12 @@ public void isTopicEmpty_OnePartition_IsEmpty() { final List topicPartitions = buildTopicPartitions(1); consumer = createObjectUnderTest("json", true); + + // Reset the static variables + KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null; + KafkaCustomConsumer.lastIsEmptyCheckTime = 0; + KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear(); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); when(partitionInfo.partition()).thenReturn(0); when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); @@ -500,6 +507,12 @@ public void isTopicEmpty_OnePartition_PartitionNeverHadData() { final List topicPartitions = buildTopicPartitions(1); consumer = createObjectUnderTest("json", true); + + // Reset the static variables + KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null; + KafkaCustomConsumer.lastIsEmptyCheckTime = 0; + KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear(); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); when(partitionInfo.partition()).thenReturn(0); when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); @@ -520,6 +533,12 @@ public void isTopicEmpty_OnePartition_IsNotEmpty() { final List topicPartitions = buildTopicPartitions(1); consumer = createObjectUnderTest("json", true); + + // Reset the static variables + KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null; + KafkaCustomConsumer.lastIsEmptyCheckTime = 0; + KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear(); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); when(partitionInfo.partition()).thenReturn(0); when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); @@ -541,6 +560,12 @@ public void isTopicEmpty_OnePartition_NoCommittedPartition() { final List topicPartitions = buildTopicPartitions(1); consumer = createObjectUnderTest("json", true); + + // Reset the static variables + KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null; + KafkaCustomConsumer.lastIsEmptyCheckTime = 0; + KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear(); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo)); when(partitionInfo.partition()).thenReturn(0); when(kafkaConsumer.committed(anySet())).thenReturn(Collections.emptyMap()); @@ -561,6 +586,12 @@ public void isTopicEmpty_MultiplePartitions_AllEmpty() { final List topicPartitions = buildTopicPartitions(2); consumer = createObjectUnderTest("json", true); + + // Reset the static variables + KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null; + KafkaCustomConsumer.lastIsEmptyCheckTime = 0; + KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear(); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo, partitionInfo)); when(partitionInfo.partition()).thenReturn(0).thenReturn(1); when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); @@ -585,6 +616,12 @@ public void isTopicEmpty_MultiplePartitions_OneNotEmpty() { final List topicPartitions = buildTopicPartitions(2); consumer = createObjectUnderTest("json", true); + + // Reset the static variables + KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null; + KafkaCustomConsumer.lastIsEmptyCheckTime = 0; + KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear(); + when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo, partitionInfo)); when(partitionInfo.partition()).thenReturn(0).thenReturn(1); when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata)); @@ -602,6 +639,34 @@ public void isTopicEmpty_MultiplePartitions_OneNotEmpty() { verify(offsetAndMetadata, times(2)).offset(); } + @Test + public void isTopicEmpty_NonCheckerThread_ShortCircuits() { + consumer = createObjectUnderTest("json", true); + + // Reset the static variables + KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = Thread.currentThread().getId() - 1; + KafkaCustomConsumer.lastIsEmptyCheckTime = 0; + KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear(); + + assertThat(consumer.isTopicEmpty(), equalTo(true)); + + verifyNoInteractions(kafkaConsumer); + } + + @Test + public void isTopicEmpty_CheckedWithinDelay_ShortCircuits() { + consumer = createObjectUnderTest("json", true); + + // Reset the static variables + KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null; + KafkaCustomConsumer.lastIsEmptyCheckTime = System.currentTimeMillis(); + KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear(); + + assertThat(consumer.isTopicEmpty(), equalTo(true)); + + verifyNoInteractions(kafkaConsumer); + } + private List buildTopicPartitions(final int partitionCount) { return IntStream.range(0, partitionCount) .mapToObj(i -> new TopicPartition(TOPIC_NAME, i))