Skip to content

Commit

Permalink
Add unit tests for thread safety logic
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 25, 2023
1 parent f705ea0 commit 4ed5cfa
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
private static final long IS_EMPTY_CHECK_INTERVAL_MS = 60000L;
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
private static final ConcurrentHashMap<TopicPartition, Boolean> TOPIC_PARTITION_TO_IS_EMPTY = new ConcurrentHashMap<>();
private static Long topicEmptyCheckingOwnerThreadId = null;
private static long lastIsEmptyCheckTime = 0;
static final ConcurrentHashMap<TopicPartition, Boolean> TOPIC_PARTITION_TO_IS_EMPTY = new ConcurrentHashMap<>();
static Long topicEmptyCheckingOwnerThreadId = null;
static long lastIsEmptyCheckTime = 0;
static final String DEFAULT_KEY = "message";

private volatile long lastCommitTime;
Expand Down Expand Up @@ -557,8 +557,6 @@ public synchronized boolean isTopicEmpty() {
for (TopicPartition topicPartition : topicPartitions) {
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
LOG.info("Partition {} offsets: endOffset: {}, committedOffset: {}",
topicPartition, endOffset, Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset());
TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, true);

// If there is data in the partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -479,6 +480,12 @@ public void isTopicEmpty_OnePartition_IsEmpty() {
final List<TopicPartition> topicPartitions = buildTopicPartitions(1);

consumer = createObjectUnderTest("json", true);

// Reset the static variables
KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null;
KafkaCustomConsumer.lastIsEmptyCheckTime = 0;
KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear();

when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo));
when(partitionInfo.partition()).thenReturn(0);
when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata));
Expand All @@ -500,6 +507,12 @@ public void isTopicEmpty_OnePartition_PartitionNeverHadData() {
final List<TopicPartition> topicPartitions = buildTopicPartitions(1);

consumer = createObjectUnderTest("json", true);

// Reset the static variables
KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null;
KafkaCustomConsumer.lastIsEmptyCheckTime = 0;
KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear();

when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo));
when(partitionInfo.partition()).thenReturn(0);
when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata));
Expand All @@ -520,6 +533,12 @@ public void isTopicEmpty_OnePartition_IsNotEmpty() {
final List<TopicPartition> topicPartitions = buildTopicPartitions(1);

consumer = createObjectUnderTest("json", true);

// Reset the static variables
KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null;
KafkaCustomConsumer.lastIsEmptyCheckTime = 0;
KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear();

when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo));
when(partitionInfo.partition()).thenReturn(0);
when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata));
Expand All @@ -541,6 +560,12 @@ public void isTopicEmpty_OnePartition_NoCommittedPartition() {
final List<TopicPartition> topicPartitions = buildTopicPartitions(1);

consumer = createObjectUnderTest("json", true);

// Reset the static variables
KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null;
KafkaCustomConsumer.lastIsEmptyCheckTime = 0;
KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear();

when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo));
when(partitionInfo.partition()).thenReturn(0);
when(kafkaConsumer.committed(anySet())).thenReturn(Collections.emptyMap());
Expand All @@ -561,6 +586,12 @@ public void isTopicEmpty_MultiplePartitions_AllEmpty() {
final List<TopicPartition> topicPartitions = buildTopicPartitions(2);

consumer = createObjectUnderTest("json", true);

// Reset the static variables
KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null;
KafkaCustomConsumer.lastIsEmptyCheckTime = 0;
KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear();

when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo, partitionInfo));
when(partitionInfo.partition()).thenReturn(0).thenReturn(1);
when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata));
Expand All @@ -585,6 +616,12 @@ public void isTopicEmpty_MultiplePartitions_OneNotEmpty() {
final List<TopicPartition> topicPartitions = buildTopicPartitions(2);

consumer = createObjectUnderTest("json", true);

// Reset the static variables
KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null;
KafkaCustomConsumer.lastIsEmptyCheckTime = 0;
KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear();

when(kafkaConsumer.partitionsFor(TOPIC_NAME)).thenReturn(List.of(partitionInfo, partitionInfo));
when(partitionInfo.partition()).thenReturn(0).thenReturn(1);
when(kafkaConsumer.committed(anySet())).thenReturn(getTopicPartitionToMap(topicPartitions, offsetAndMetadata));
Expand All @@ -602,6 +639,34 @@ public void isTopicEmpty_MultiplePartitions_OneNotEmpty() {
verify(offsetAndMetadata, times(2)).offset();
}

@Test
public void isTopicEmpty_NonCheckerThread_ShortCircuits() {
consumer = createObjectUnderTest("json", true);

// Reset the static variables
KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = Thread.currentThread().getId() - 1;
KafkaCustomConsumer.lastIsEmptyCheckTime = 0;
KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear();

assertThat(consumer.isTopicEmpty(), equalTo(true));

verifyNoInteractions(kafkaConsumer);
}

@Test
public void isTopicEmpty_CheckedWithinDelay_ShortCircuits() {
consumer = createObjectUnderTest("json", true);

// Reset the static variables
KafkaCustomConsumer.topicEmptyCheckingOwnerThreadId = null;
KafkaCustomConsumer.lastIsEmptyCheckTime = System.currentTimeMillis();
KafkaCustomConsumer.TOPIC_PARTITION_TO_IS_EMPTY.clear();

assertThat(consumer.isTopicEmpty(), equalTo(true));

verifyNoInteractions(kafkaConsumer);
}

private List<TopicPartition> buildTopicPartitions(final int partitionCount) {
return IntStream.range(0, partitionCount)
.mapToObj(i -> new TopicPartition(TOPIC_NAME, i))
Expand Down

0 comments on commit 4ed5cfa

Please sign in to comment.