diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java index 866f13aac2..33e8832662 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java @@ -75,6 +75,12 @@ void getDrainTimeout_returns_buffer_drain_timeout() { assertThat(result, equalTo(duration)); } + @Test + void shutdown_calls_buffer_shutdown() { + createObjectUnderTest().shutdown(); + verify(buffer).shutdown(); + } + @Nested class NoCircuitBreakerChecks { @AfterEach diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java index 57b14171ec..ff943c248d 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java @@ -9,6 +9,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.mockito.Spy; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.processor.Processor; @@ -107,7 +108,8 @@ void testPipelineState() { final TestSink testSink = new TestSink(); final DataFlowComponent sinkDataFlowComponent = mock(DataFlowComponent.class); when(sinkDataFlowComponent.getComponent()).thenReturn(testSink); - final Pipeline testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME), + final Buffer buffer = spy(new BlockingBuffer(TEST_PIPELINE_NAME)); + final Pipeline testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, buffer, Collections.emptyList(), Collections.singletonList(sinkDataFlowComponent), router, eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); @@ -119,6 +121,7 @@ void testPipelineState() { testPipeline.shutdown(); assertThat("Pipeline isStopRequested is expected to be true", testPipeline.isStopRequested(), is(true)); assertThat("Sink shutdown should be called", testSink.isShutdown, is(true)); + verify(buffer).shutdown(); } @Test diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/MultiBufferDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/MultiBufferDecoratorTest.java index 028e7dcf8a..003e47c024 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/MultiBufferDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/MultiBufferDecoratorTest.java @@ -201,6 +201,26 @@ void getDrainTimeout_MultipleSecondaryBuffers_ReturnsSumOfDurations() { verify(secondaryBuffer, times(2)).getDrainTimeout(); } + @Test + void shutdown_NoSecondaryBuffers_CallsPrimaryBufferShutdown() { + createObjectUnderTest(0).shutdown(); + verify(primaryBuffer).shutdown(); + } + + @Test + void shutdown_OneSecondaryBuffers_CallsPrimaryAndSecondaryBufferShutdown() { + createObjectUnderTest(1).shutdown(); + verify(primaryBuffer).shutdown(); + verify(secondaryBuffer).shutdown(); + } + + @Test + void shutdown_MultipleSecondaryBuffers_CallsAllBuffersShutdown() { + createObjectUnderTest(2).shutdown(); + verify(primaryBuffer).shutdown(); + verify(secondaryBuffer, times(2)).shutdown(); + } + private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) { final List secondaryBuffers = IntStream.range(0, secondaryBufferCount) .mapToObj(i -> secondaryBuffer) diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 37a9c6e8bf..6a3540b8cf 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.hamcrest.MatcherAssert.assertThat; @@ -46,13 +47,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBuffer.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -232,4 +236,37 @@ void test_kafkaBuffer_getDrainTimeout() { verify(bufferConfig).getDrainTimeout(); } + + @Test + public void testShutdown_Successful() throws InterruptedException { + kafkaBuffer = createObjectUnderTest(); + lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS))).thenReturn(true); + + kafkaBuffer.shutdown(); + verify(executorService).shutdown(); + verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS)); + } + + @Test + public void testShutdown_Timeout() throws InterruptedException { + kafkaBuffer = createObjectUnderTest(); + lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS))).thenReturn(false); + + kafkaBuffer.shutdown(); + verify(executorService).shutdown(); + verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS)); + verify(executorService).shutdownNow(); + } + + @Test + public void testShutdown_InterruptedException() throws InterruptedException { + kafkaBuffer = createObjectUnderTest(); + lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS))) + .thenThrow(new InterruptedException()); + + kafkaBuffer.shutdown(); + verify(executorService).shutdown(); + verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS)); + verify(executorService).shutdownNow(); + } } \ No newline at end of file