diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 8cfd1ddd43..7e27db0afd 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.ArrayList; import java.util.Map; @@ -102,10 +103,9 @@ private void processAcknowledgements(List inputEvents, Collection inputEvents = null; if (acknowledgementsEnabled) { - inputEvents = ((ArrayList>)records).stream().map(Record::getData).collect(Collectors.toList()); + inputEvents = ((ArrayList>) records).stream().map(Record::getData).collect(Collectors.toList()); } - records = processor.execute(records); - if (inputEvents != null) { - processAcknowledgements(inputEvents, records); + + try { + records = processor.execute(records); + if (inputEvents != null) { + processAcknowledgements(inputEvents, records); + } + } catch (final Exception e) { + LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); + if (inputEvents != null) { + processAcknowledgements(inputEvents, Collections.emptyList()); + } + + records = Collections.emptyList(); + break; } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java new file mode 100644 index 0000000000..9b31b20691 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java @@ -0,0 +1,211 @@ +package org.opensearch.dataprepper.pipeline; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.pipeline.common.FutureHelper; +import org.opensearch.dataprepper.pipeline.common.FutureHelperResult; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class ProcessWorkerTest { + + @Mock + private Pipeline pipeline; + + @Mock + private Buffer buffer; + + @Mock + private Source source; + + private List> sinkFutures; + + private List processors; + + @BeforeEach + void setup() { + when(pipeline.isStopRequested()).thenReturn(false).thenReturn(true); + when(source.areAcknowledgementsEnabled()).thenReturn(false); + when(pipeline.getSource()).thenReturn(source); + when(buffer.isEmpty()).thenReturn(true); + when(pipeline.getPeerForwarderDrainTimeout()).thenReturn(Duration.ofMillis(100)); + when(pipeline.getReadBatchTimeoutInMillis()).thenReturn(500); + + final Future sinkFuture = mock(Future.class); + sinkFutures = List.of(sinkFuture); + when(pipeline.publishToSinks(any())).thenReturn(sinkFutures); + } + + private ProcessWorker createObjectUnderTest() { + return new ProcessWorker(buffer, processors, pipeline); + } + + @Test + void testProcessWorkerHappyPath() { + + final List records = List.of(mock(Record.class)); + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor = mock(Processor.class); + when(processor.execute(records)).thenReturn(records); + when(processor.isReadyForShutdown()).thenReturn(true); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + + + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + } + } + + @Test + void testProcessWorkerHappyPathWithAcknowledgments() { + + when(source.areAcknowledgementsEnabled()).thenReturn(true); + + final List> records = new ArrayList<>(); + final Record mockRecord = mock(Record.class); + final Event mockEvent = mock(Event.class); + final EventHandle eventHandle = mock(DefaultEventHandle.class); + when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class)); + when(mockRecord.getData()).thenReturn(mockEvent); + when(mockEvent.getEventHandle()).thenReturn(eventHandle); + + records.add(mockRecord); + + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor = mock(Processor.class); + when(processor.execute(records)).thenReturn(records); + when(processor.isReadyForShutdown()).thenReturn(true); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + + + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + } + } + + @Test + void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() { + + final List records = List.of(mock(Record.class)); + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor = mock(Processor.class); + when(processor.execute(records)).thenThrow(RuntimeException.class); + when(processor.isReadyForShutdown()).thenReturn(true); + + final Processor skippedProcessor = mock(Processor.class); + when(skippedProcessor.isReadyForShutdown()).thenReturn(true); + processors = List.of(processor, skippedProcessor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + + + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + } + + verify(skippedProcessor, never()).execute(any()); + } + + @Test + void testProcessWorkerWithProcessorThrowingExceptionAndAcknowledgmentsEnabledIsHandledProperly() { + + when(source.areAcknowledgementsEnabled()).thenReturn(true); + + final List> records = new ArrayList<>(); + final Record mockRecord = mock(Record.class); + final Event mockEvent = mock(Event.class); + final EventHandle eventHandle = mock(DefaultEventHandle.class); + when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class)); + doNothing().when(eventHandle).release(true); + when(mockRecord.getData()).thenReturn(mockEvent); + when(mockEvent.getEventHandle()).thenReturn(eventHandle); + + records.add(mockRecord); + + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor = mock(Processor.class); + when(processor.execute(records)).thenThrow(RuntimeException.class); + when(processor.isReadyForShutdown()).thenReturn(true); + + final Processor skippedProcessor = mock(Processor.class); + when(skippedProcessor.isReadyForShutdown()).thenReturn(true); + processors = List.of(processor, skippedProcessor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + + + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + } + + verify(skippedProcessor, never()).execute(any()); + } +}