Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 9, 2023
1 parent d44976f commit 0dfed1f
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ void getDrainTimeout_returns_buffer_drain_timeout() {
assertThat(result, equalTo(duration));
}

@Test
void shutdown_calls_buffer_shutdown() {
createObjectUnderTest().shutdown();
verify(buffer).shutdown();
}

@Nested
class NoCircuitBreakerChecks {
@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Spy;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.processor.Processor;
Expand Down Expand Up @@ -107,7 +108,8 @@ void testPipelineState() {
final TestSink testSink = new TestSink();
final DataFlowComponent<Sink> sinkDataFlowComponent = mock(DataFlowComponent.class);
when(sinkDataFlowComponent.getComponent()).thenReturn(testSink);
final Pipeline testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME),
final Buffer buffer = spy(new BlockingBuffer(TEST_PIPELINE_NAME));
final Pipeline testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, buffer,
Collections.emptyList(), Collections.singletonList(sinkDataFlowComponent), router, eventFactory,
acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT,
processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout);
Expand All @@ -119,6 +121,7 @@ void testPipelineState() {
testPipeline.shutdown();
assertThat("Pipeline isStopRequested is expected to be true", testPipeline.isStopRequested(), is(true));
assertThat("Sink shutdown should be called", testSink.isShutdown, is(true));
verify(buffer).shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,26 @@ void getDrainTimeout_MultipleSecondaryBuffers_ReturnsSumOfDurations() {
verify(secondaryBuffer, times(2)).getDrainTimeout();
}

@Test
void shutdown_NoSecondaryBuffers_CallsPrimaryBufferShutdown() {
createObjectUnderTest(0).shutdown();
verify(primaryBuffer).shutdown();
}

@Test
void shutdown_OneSecondaryBuffers_CallsPrimaryAndSecondaryBufferShutdown() {
createObjectUnderTest(1).shutdown();
verify(primaryBuffer).shutdown();
verify(secondaryBuffer).shutdown();
}

@Test
void shutdown_MultipleSecondaryBuffers_CallsAllBuffersShutdown() {
createObjectUnderTest(2).shutdown();
verify(primaryBuffer).shutdown();
verify(secondaryBuffer, times(2)).shutdown();
}

private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List<Buffer> secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -46,13 +47,16 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBuffer.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down Expand Up @@ -232,4 +236,37 @@ void test_kafkaBuffer_getDrainTimeout() {

verify(bufferConfig).getDrainTimeout();
}

@Test
public void testShutdown_Successful() throws InterruptedException {
kafkaBuffer = createObjectUnderTest();
lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS))).thenReturn(true);

kafkaBuffer.shutdown();
verify(executorService).shutdown();
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
}

@Test
public void testShutdown_Timeout() throws InterruptedException {
kafkaBuffer = createObjectUnderTest();
lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS))).thenReturn(false);

kafkaBuffer.shutdown();
verify(executorService).shutdown();
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
verify(executorService).shutdownNow();
}

@Test
public void testShutdown_InterruptedException() throws InterruptedException {
kafkaBuffer = createObjectUnderTest();
lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS)))
.thenThrow(new InterruptedException());

kafkaBuffer.shutdown();
verify(executorService).shutdown();
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
verify(executorService).shutdownNow();
}
}

0 comments on commit 0dfed1f

Please sign in to comment.