Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 24, 2023
1 parent a85a4fc commit 0c1fe06
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ public void doCheckpoint(CheckpointState checkpointState) {

@Override
public boolean isEmpty() {
LOG.info("Consumers: {}", emptyCheckingConsumers.size());
final boolean areTopicsEmpty = emptyCheckingConsumers.stream()
.allMatch(KafkaCustomConsumer::isEmpty);
.allMatch(KafkaCustomConsumer::isTopicEmpty);
LOG.info("Are topics empty: {}", areTopicsEmpty);

// TODO: check Kafka topic is empty as well.
return areTopicsEmpty && innerBuffer.isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ final String getTopicPartitionOffset(final Map<TopicPartition, Long> offsetMap,
return Objects.isNull(offset) ? "-" : offset.toString();
}

public boolean isEmpty() {
public boolean isTopicEmpty() {
final List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
final List<TopicPartition> topicPartitions = partitions.stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
Expand All @@ -543,8 +543,6 @@ public boolean isEmpty() {
for (TopicPartition topicPartition : topicPartitions) {
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
LOG.info("Partition {} offsets: endOffset: {}, committedOffset: {}",
topicPartition, endOffset, Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset());

// If there is data in the partition
if (endOffset != 0L) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaBufferConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker;
Expand All @@ -33,6 +35,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
Expand All @@ -55,7 +58,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBuffer.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT;

Expand Down Expand Up @@ -103,21 +108,35 @@ class KafkaBufferTest {
@Mock
KafkaCustomProducer<Event> producer;

@Mock
private KafkaCustomConsumerFactory consumerFactory;

@Mock
private KafkaCustomConsumer consumer;

@Mock
BlockingBuffer<Record<Event>> blockingBuffer;

@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

public KafkaBuffer<Record<Event>> createObjectUnderTest() {
return createObjectUnderTest(List.of(consumer));
}

public KafkaBuffer<Record<Event>> createObjectUnderTest(final List<KafkaCustomConsumer> consumers) {
try (
final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedConstruction<KafkaCustomProducerFactory> producerFactoryMock =
mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> {
producerFactory = mock;
when(producerFactory.createProducer(any() ,any(), any(), isNull(), isNull())).thenReturn(producer);
});
final MockedConstruction<KafkaCustomConsumerFactory> consumerFactoryMock =
mockConstruction(KafkaCustomConsumerFactory.class, (mock, context) -> {
consumerFactory = mock;
when(consumerFactory.createConsumersForTopic(any(), any(), any(), any(), any(), any(), any())).thenReturn(consumers);
});
final MockedConstruction<BlockingBuffer> blockingBufferMock =
mockConstruction(BlockingBuffer.class, (mock, context) -> {
blockingBuffer = mock;
Expand Down Expand Up @@ -206,12 +225,100 @@ void test_kafkaBuffer_doWriteAll() throws Exception {
}

@Test
void test_kafkaBuffer_isEmpty() {
void test_kafkaBuffer_isEmpty_True() {
kafkaBuffer = createObjectUnderTest();
assertTrue(Objects.nonNull(kafkaBuffer));
when(blockingBuffer.isEmpty()).thenReturn(true);
when(consumer.isTopicEmpty()).thenReturn(true);

final boolean result = kafkaBuffer.isEmpty();
assertThat(result, equalTo(true));

verify(blockingBuffer).isEmpty();
verify(consumer).isTopicEmpty();
}

@Test
void test_kafkaBuffer_isEmpty_BufferNotEmpty() {
kafkaBuffer = createObjectUnderTest();
assertTrue(Objects.nonNull(kafkaBuffer));
when(blockingBuffer.isEmpty()).thenReturn(false);
when(consumer.isTopicEmpty()).thenReturn(true);

final boolean result = kafkaBuffer.isEmpty();
assertThat(result, equalTo(false));

verify(blockingBuffer).isEmpty();
verify(consumer).isTopicEmpty();
}

@Test
void test_kafkaBuffer_isEmpty_TopicNotEmpty() {
kafkaBuffer = createObjectUnderTest();
assertTrue(Objects.nonNull(kafkaBuffer));
when(blockingBuffer.isEmpty()).thenReturn(true);
when(consumer.isTopicEmpty()).thenReturn(false);

final boolean result = kafkaBuffer.isEmpty();
assertThat(result, equalTo(false));

verifyNoInteractions(blockingBuffer);
verify(consumer).isTopicEmpty();
}

@Test
void test_kafkaBuffer_isEmpty_MultipleTopics_AllNotEmpty() {
kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer));
assertTrue(Objects.nonNull(kafkaBuffer));
when(blockingBuffer.isEmpty()).thenReturn(true);
when(consumer.isTopicEmpty()).thenReturn(false).thenReturn(false);

final boolean result = kafkaBuffer.isEmpty();
assertThat(result, equalTo(false));

verifyNoInteractions(blockingBuffer);
verify(consumer).isTopicEmpty();
}

@Test
void test_kafkaBuffer_isEmpty_MultipleTopics_SomeNotEmpty() {
kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer));
assertTrue(Objects.nonNull(kafkaBuffer));
when(blockingBuffer.isEmpty()).thenReturn(true);
when(consumer.isTopicEmpty()).thenReturn(true).thenReturn(false);

final boolean result = kafkaBuffer.isEmpty();
assertThat(result, equalTo(false));

verifyNoInteractions(blockingBuffer);
verify(consumer, times(2)).isTopicEmpty();
}

@Test
void test_kafkaBuffer_isEmpty_MultipleTopics_AllEmpty() {
kafkaBuffer = createObjectUnderTest(List.of(consumer, consumer));
assertTrue(Objects.nonNull(kafkaBuffer));
when(blockingBuffer.isEmpty()).thenReturn(true);
when(consumer.isTopicEmpty()).thenReturn(true).thenReturn(true);

final boolean result = kafkaBuffer.isEmpty();
assertThat(result, equalTo(true));

verify(blockingBuffer).isEmpty();
verify(consumer, times(2)).isTopicEmpty();
}

@Test
void test_kafkaBuffer_isEmpty_ZeroTopics() {
kafkaBuffer = createObjectUnderTest(Collections.emptyList());
assertTrue(Objects.nonNull(kafkaBuffer));
when(blockingBuffer.isEmpty()).thenReturn(true);

final boolean result = kafkaBuffer.isEmpty();
assertThat(result, equalTo(true));

kafkaBuffer.isEmpty();
verify(blockingBuffer).isEmpty();
verifyNoInteractions(consumer);
}

@Test
Expand Down
Loading

0 comments on commit 0c1fe06

Please sign in to comment.