diff --git a/data-prepper-plugins/s3-source/README.md b/data-prepper-plugins/s3-source/README.md index 0da719066c..4dc4078a1e 100644 --- a/data-prepper-plugins/s3-source/README.md +++ b/data-prepper-plugins/s3-source/README.md @@ -88,6 +88,8 @@ The AWS configuration is the same for both SQS and S3. ### Counters * `s3ObjectsFailed` - The number of S3 objects that the S3 Source failed to read. +* `s3ObjectsNotFound` - The number of S3 objects that the S3 Source failed to read due to a Not Found error from S3. These are also counted toward `s3ObjectsFailed`. +* `s3ObjectsAccessDenied` - The number of S3 objects that the S3 Source failed to read due to an Access Denied or Forbidden error. These are also counted toward `s3ObjectsFailed`. * `s3ObjectsSucceeded` - The number of S3 objects that the S3 Source successfully read. * `sqsMessagesReceived` - The number of SQS messages received from the queue by the S3 Source. * `sqsMessagesDeleted` - The number of SQS messages deleted from the queue by the S3 Source. @@ -99,6 +101,12 @@ The AWS configuration is the same for both SQS and S3. * `s3ObjectReadTimeElapsed` - Measures the time the S3 Source takes to perform a request to GET an S3 object, parse it, and write Events to the buffer. * `sqsMessageDelay` - Measures the time from when S3 records an event time for the creation of an object to when it was fully parsed. +### Distribution Summaries + +* `s3ObjectSizeBytes` - Measures the size of S3 objects as reported by the S3 `Content-Length`. For compressed objects, this is the compressed size. +* `s3ObjectProcessedBytes` - Measures the bytes processed by the S3 source for a given object. For compressed objects, this is the un-compressed size. +* `s3ObjectsEvents` - Measures the number of events (sometimes called records) produced by an S3 object. + ## Developer Guide The integration tests for this plugin do not run as part of the Data Prepper build. diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java index c121646e9d..7ce88edc49 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerIT.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source; +import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; @@ -77,8 +78,10 @@ void setUp() { pluginMetrics = mock(PluginMetrics.class); final Counter counter = mock(Counter.class); + final DistributionSummary distributionSummary = mock(DistributionSummary.class); final Timer timer = new NoopTimer(new Meter.Id("test", Tags.empty(), null, null, Meter.Type.TIMER)); when(pluginMetrics.counter(anyString())).thenReturn(counter); + when(pluginMetrics.summary(anyString())).thenReturn(distributionSummary); when(pluginMetrics.timer(anyString())).thenReturn(timer); bucketOwnerProvider = b -> Optional.empty(); diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java index 0e5156435d..dd3936f93c 100644 --- a/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java +++ b/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/SqsWorkerIT.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source; +import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.source.configuration.OnErrorOption; import org.opensearch.dataprepper.plugins.source.configuration.SqsOptions; @@ -61,9 +62,11 @@ void setUp() { pluginMetrics = mock(PluginMetrics.class); final Counter sharedCounter = mock(Counter.class); + final DistributionSummary distributionSummary = mock(DistributionSummary.class); final Timer sqsMessageDelayTimer = mock(Timer.class); when(pluginMetrics.counter(anyString())).thenReturn(sharedCounter); + when(pluginMetrics.summary(anyString())).thenReturn(distributionSummary); when(pluginMetrics.timer(anyString())).thenReturn(sqsMessageDelayTimer); final SqsOptions sqsOptions = mock(SqsOptions.class); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/BufferAccumulator.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/BufferAccumulator.java index e55eb57f7f..8074e52758 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/BufferAccumulator.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/BufferAccumulator.java @@ -24,6 +24,7 @@ class BufferAccumulator> { private final Buffer buffer; private final int numberOfRecordsToAccumulate; private final int bufferTimeoutMillis; + private int totalWritten = 0; private final Collection recordsAccumulated; @@ -55,9 +56,20 @@ void flush() throws Exception { } private void flushAccumulatedToBuffer() throws Exception { - if (recordsAccumulated.size() > 0) { + final int currentRecordCountAccumulated = recordsAccumulated.size(); + if (currentRecordCountAccumulated > 0) { buffer.writeAll(recordsAccumulated, bufferTimeoutMillis); recordsAccumulated.clear(); + totalWritten += currentRecordCountAccumulated; } } + + /** + * Gets the total number of records written to the buffer. + * + * @return the total number of records written + */ + public int getTotalWritten() { + return totalWritten; + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java index fd09640907..a1d6955b66 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java @@ -5,6 +5,10 @@ package org.opensearch.dataprepper.plugins.source; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import org.apache.commons.compress.utils.CountingInputStream; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; @@ -12,17 +16,16 @@ import org.opensearch.dataprepper.plugins.source.codec.Codec; import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.IOException; -import java.io.InputStream; import java.time.Duration; import java.util.concurrent.Callable; import java.util.function.BiConsumer; @@ -34,8 +37,13 @@ class S3ObjectWorker { private static final Logger LOG = LoggerFactory.getLogger(S3ObjectWorker.class); static final String S3_OBJECTS_FAILED_METRIC_NAME = "s3ObjectsFailed"; + static final String S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME = "s3ObjectsNotFound"; + static final String S3_OBJECTS_FAILED_NOT_FOUND_ACCESS_DENIED = "s3ObjectsAccessDenied"; static final String S3_OBJECTS_SUCCEEDED_METRIC_NAME = "s3ObjectsSucceeded"; static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed"; + static final String S3_OBJECTS_SIZE = "s3ObjectSizeBytes"; + static final String S3_OBJECTS_SIZE_PROCESSED = "s3ObjectProcessedBytes"; + static final String S3_OBJECTS_EVENTS = "s3ObjectsEvents"; private final S3Client s3Client; private final Buffer> buffer; @@ -46,8 +54,13 @@ class S3ObjectWorker { private final int numberOfRecordsToAccumulate; private final BiConsumer eventConsumer; private final Counter s3ObjectsFailedCounter; + private final Counter s3ObjectsFailedNotFoundCounter; + private final Counter s3ObjectsFailedAccessDeniedCounter; private final Counter s3ObjectsSucceededCounter; private final Timer s3ObjectReadTimer; + private final DistributionSummary s3ObjectSizeSummary; + private final DistributionSummary s3ObjectSizeProcessedSummary; + private final DistributionSummary s3ObjectEventsSummary; public S3ObjectWorker(final S3Client s3Client, final Buffer> buffer, @@ -68,8 +81,13 @@ public S3ObjectWorker(final S3Client s3Client, this.eventConsumer = eventConsumer; s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME); + s3ObjectsFailedNotFoundCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME); + s3ObjectsFailedAccessDeniedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_NOT_FOUND_ACCESS_DENIED); s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME); s3ObjectReadTimer = pluginMetrics.timer(S3_OBJECTS_TIME_ELAPSED_METRIC_NAME); + s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE); + s3ObjectSizeProcessedSummary = pluginMetrics.summary(S3_OBJECTS_SIZE_PROCESSED); + s3ObjectEventsSummary = pluginMetrics.summary(S3_OBJECTS_EVENTS); } void parseS3Object(final S3ObjectReference s3ObjectReference) throws IOException { @@ -99,8 +117,12 @@ void parseS3Object(final S3ObjectReference s3ObjectReference) throws IOException } private void doParseObject(final S3ObjectReference s3ObjectReference, final GetObjectRequest getObjectRequest, final BufferAccumulator> bufferAccumulator) throws IOException { + final long s3ObjectSize; + final long totalBytesRead; + try (final ResponseInputStream responseInputStream = s3Client.getObject(getObjectRequest); - final InputStream inputStream = compressionEngine.createInputStream(getObjectRequest.key(), responseInputStream)) { + final CountingInputStream inputStream = new CountingInputStream(compressionEngine.createInputStream(getObjectRequest.key(), responseInputStream))) { + s3ObjectSize = responseInputStream.response().contentLength(); codec.parse(inputStream, record -> { try { eventConsumer.accept(record.getData(), s3ObjectReference); @@ -109,10 +131,14 @@ private void doParseObject(final S3ObjectReference s3ObjectReference, final GetO LOG.error("Failed writing S3 objects to buffer.", e); } }); - } catch (final Exception e) { - LOG.error("Error reading from S3 object: s3ObjectReference={}.", s3ObjectReference, e); + totalBytesRead = inputStream.getBytesRead(); + } catch (final Exception ex) { + LOG.error("Error reading from S3 object: s3ObjectReference={}.", s3ObjectReference, ex); s3ObjectsFailedCounter.increment(); - throw e; + if(ex instanceof S3Exception) { + recordS3Exception((S3Exception) ex); + } + throw ex; } try { @@ -120,5 +146,18 @@ private void doParseObject(final S3ObjectReference s3ObjectReference, final GetO } catch (final Exception e) { LOG.error("Failed writing S3 objects to buffer.", e); } + + s3ObjectSizeSummary.record(s3ObjectSize); + s3ObjectSizeProcessedSummary.record(totalBytesRead); + s3ObjectEventsSummary.record(bufferAccumulator.getTotalWritten()); + } + + private void recordS3Exception(final S3Exception ex) { + if(ex.statusCode() == HttpStatusCode.NOT_FOUND) { + s3ObjectsFailedNotFoundCounter.increment(); + } + else if(ex.statusCode() == HttpStatusCode.FORBIDDEN) { + s3ObjectsFailedAccessDeniedCounter.increment(); + } } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/BufferAccumulatorTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/BufferAccumulatorTest.java index 2fe3cc5d43..24d50c3da7 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/BufferAccumulatorTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/BufferAccumulatorTest.java @@ -157,6 +157,66 @@ void flush_after_add_writes_to_buffer() throws Exception { assertThat(actualRecordsWritten, equalTo(Collections.singletonList(record))); } + @Test + void getTotalWritten_returns_zero_if_no_writes() throws Exception { + assertThat(createObjectUnderTest().getTotalWritten(), equalTo(0)); + } + + @Test + void getTotalWritten_returns_accumulated_after_single_write() throws Exception { + final BufferAccumulator objectUnderTest = createObjectUnderTest(); + final Record record = createRecord(); + objectUnderTest.add(record); + + objectUnderTest.flush(); + + assertThat(objectUnderTest.getTotalWritten(), equalTo(1)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 20}) + void getTotalWritten_returns_accumulated_after_single_write(final int recordsInWrite) throws Exception { + recordsToAccumulate = recordsInWrite; + final BufferAccumulator objectUnderTest = createObjectUnderTest(); + + for (int i = 0; i < recordsInWrite; i++) { + objectUnderTest.add(createRecord()); + } + + assertThat(objectUnderTest.getTotalWritten(), equalTo(recordsInWrite)); + } + + @ParameterizedTest + @ValueSource(ints = {2, 10, 20}) + void getTotalWritten_returns_accumulated_after_multiple_writes(final int recordsInWrite) throws Exception { + recordsToAccumulate = 10; + final BufferAccumulator objectUnderTest = createObjectUnderTest(); + + objectUnderTest.flush(); + + for (int writes = 0; writes < recordsInWrite; writes++) { + for (int r = 0; r < recordsToAccumulate; r++) { + objectUnderTest.add(createRecord()); + } + } + + assertThat(objectUnderTest.getTotalWritten(), equalTo(recordsInWrite * recordsToAccumulate)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 15}) + void getTotalWritten_returns_flushed_data(final int accumulationCount) throws Exception { + final BufferAccumulator objectUnderTest = createObjectUnderTest(); + + for (int i = 0; i < accumulationCount; i++) { + objectUnderTest.add(createRecord()); + } + + objectUnderTest.flush(); + + assertThat(objectUnderTest.getTotalWritten(), equalTo(accumulationCount)); + } + private Record createRecord() { return mock(Record.class); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java index 5bcd75bc12..0cb6182587 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorkerTest.java @@ -5,15 +5,11 @@ package org.opensearch.dataprepper.plugins.source; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.codec.Codec; -import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; -import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; +import org.apache.commons.compress.utils.CountingInputStream; +import org.apache.commons.compress.utils.IOUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,11 +18,21 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.codec.Codec; +import org.opensearch.dataprepper.plugins.source.compression.CompressionEngine; +import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.time.Duration; @@ -38,15 +44,17 @@ import java.util.function.Consumer; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; @@ -82,21 +90,36 @@ class S3ObjectWorkerTest { @Mock private Counter s3ObjectsFailedCounter; @Mock + private Counter s3ObjectsFailedNotFoundCounter; + @Mock + private Counter s3ObjectsFailedAccessDeniedCounter; + @Mock private Counter s3ObjectsSucceededCounter; @Mock private Timer s3ObjectReadTimer; @Mock + private DistributionSummary s3ObjectSizeSummary; + @Mock + private DistributionSummary s3ObjectSizeProcessedSummary; + @Mock + private DistributionSummary s3ObjectEventsSummary; + @Mock private BiConsumer eventConsumer; private String bucketName; private String key; + @Mock private ResponseInputStream objectInputStream; + @Mock + private GetObjectResponse getObjectResponse; private Exception exceptionThrownByCallable; + private Random random; + private long objectSize; @BeforeEach void setUp() throws Exception { - final Random random = new Random(); + random = new Random(); bufferTimeout = Duration.ofMillis(random.nextInt(100) + 100); recordsToAccumulate = random.nextInt(10) + 2; @@ -105,6 +128,8 @@ void setUp() throws Exception { when(s3ObjectReference.getBucketName()).thenReturn(bucketName); when(s3ObjectReference.getKey()).thenReturn(key); + objectSize = random.nextInt(100_000) + 10_000; + exceptionThrownByCallable = null; when(s3ObjectReadTimer.recordCallable(any(Callable.class))) .thenAnswer(a -> { @@ -118,10 +143,17 @@ void setUp() throws Exception { }); when(pluginMetrics.counter(S3ObjectWorker.S3_OBJECTS_FAILED_METRIC_NAME)).thenReturn(s3ObjectsFailedCounter); + when(pluginMetrics.counter(S3ObjectWorker.S3_OBJECTS_FAILED_NOT_FOUND_METRIC_NAME)).thenReturn(s3ObjectsFailedNotFoundCounter); + when(pluginMetrics.counter(S3ObjectWorker.S3_OBJECTS_FAILED_NOT_FOUND_ACCESS_DENIED)).thenReturn(s3ObjectsFailedAccessDeniedCounter); when(pluginMetrics.counter(S3ObjectWorker.S3_OBJECTS_SUCCEEDED_METRIC_NAME)).thenReturn(s3ObjectsSucceededCounter); when(pluginMetrics.timer(S3ObjectWorker.S3_OBJECTS_TIME_ELAPSED_METRIC_NAME)).thenReturn(s3ObjectReadTimer); + when(pluginMetrics.summary(S3ObjectWorker.S3_OBJECTS_SIZE)).thenReturn(s3ObjectSizeSummary); + when(pluginMetrics.summary(S3ObjectWorker.S3_OBJECTS_SIZE_PROCESSED)).thenReturn(s3ObjectSizeProcessedSummary); + when(pluginMetrics.summary(S3ObjectWorker.S3_OBJECTS_EVENTS)).thenReturn(s3ObjectEventsSummary); - objectInputStream = mock(ResponseInputStream.class); + lenient().when(objectInputStream.response()).thenReturn(getObjectResponse); + lenient().when(getObjectResponse.contentLength()).thenReturn(objectSize); + lenient().when(compressionEngine.createInputStream(key, objectInputStream)).thenReturn(objectInputStream); } private S3ObjectWorker createObjectUnderTest() { @@ -148,7 +180,6 @@ void parseS3Object_calls_getObject_with_correct_GetObjectRequest() throws IOExce @Test void parseS3Object_calls_getObject_with_correct_GetObjectRequest_when_bucketOwner_is_present() throws IOException { - final ResponseInputStream objectInputStream = mock(ResponseInputStream.class); when(s3Client.getObject(any(GetObjectRequest.class))) .thenReturn(objectInputStream); @@ -176,7 +207,10 @@ void parseS3Object_calls_Codec_parse_on_S3InputStream() throws Exception { createObjectUnderTest().parseS3Object(s3ObjectReference); - verify(codec).parse(eq(objectInputStream), any(Consumer.class)); + final ArgumentCaptor inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class); + verify(codec).parse(inputStreamArgumentCaptor.capture(), any(Consumer.class)); + final InputStream actualInputStream = inputStreamArgumentCaptor.getValue(); + assertThat(actualInputStream, instanceOf(CountingInputStream.class)); } @Test @@ -229,7 +263,6 @@ void parseS3Object_calls_BufferAccumulator_flush_after_Codec_parse() throws Exce @Test void parseS3Object_increments_success_counter_after_parsing_S3_object() throws IOException { - final ResponseInputStream objectInputStream = mock(ResponseInputStream.class); when(s3Client.getObject(any(GetObjectRequest.class))) .thenReturn(objectInputStream); @@ -292,6 +325,44 @@ void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_t assertThat(exceptionThrownByCallable, sameInstance(expectedException)); } + @Test + void parseS3Object_throws_Exception_and_increments_NotFound_counter_when_GetObject_from_S3_is_404() { + final S3Exception expectedException = mock(S3Exception.class); + when(expectedException.statusCode()).thenReturn(404); + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenThrow(expectedException); + + final S3ObjectWorker objectUnderTest = createObjectUnderTest(); + final S3Exception actualException = assertThrows(S3Exception.class, () -> objectUnderTest.parseS3Object(s3ObjectReference)); + + assertThat(actualException, sameInstance(expectedException)); + + verify(s3ObjectsFailedCounter).increment(); + verify(s3ObjectsFailedNotFoundCounter).increment(); + verifyNoInteractions(s3ObjectsSucceededCounter); + verifyNoInteractions(s3ObjectsFailedAccessDeniedCounter); + assertThat(exceptionThrownByCallable, sameInstance(expectedException)); + } + + @Test + void parseS3Object_throws_Exception_and_increments_NotFound_counter_when_GetObject_from_S3_is_403() { + final S3Exception expectedException = mock(S3Exception.class); + when(expectedException.statusCode()).thenReturn(403); + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenThrow(expectedException); + + final S3ObjectWorker objectUnderTest = createObjectUnderTest(); + final S3Exception actualException = assertThrows(S3Exception.class, () -> objectUnderTest.parseS3Object(s3ObjectReference)); + + assertThat(actualException, sameInstance(expectedException)); + + verify(s3ObjectsFailedCounter).increment(); + verify(s3ObjectsFailedAccessDeniedCounter).increment(); + verifyNoInteractions(s3ObjectsSucceededCounter); + verifyNoInteractions(s3ObjectsFailedNotFoundCounter); + assertThat(exceptionThrownByCallable, sameInstance(expectedException)); + } + @Test void parseS3Object_throws_Exception_and_increments_failure_counter_when_CompressionEngine_fails() throws IOException { when(s3Client.getObject(any(GetObjectRequest.class))) @@ -311,7 +382,6 @@ void parseS3Object_throws_Exception_and_increments_failure_counter_when_Compress @Test void parseS3Object_calls_GetObject_after_Callable() throws Exception { - final ResponseInputStream objectInputStream = mock(ResponseInputStream.class); when(s3Client.getObject(any(GetObjectRequest.class))) .thenReturn(objectInputStream); @@ -323,4 +393,53 @@ void parseS3Object_calls_GetObject_after_Callable() throws Exception { inOrder.verify(s3ObjectReadTimer).recordCallable(any(Callable.class)); inOrder.verify(s3Client).getObject(any(GetObjectRequest.class)); } + + @Test + void parseS3Object_records_BufferAccumulator_getTotalWritten() throws IOException { + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(objectInputStream); + when(compressionEngine.createInputStream(key, objectInputStream)).thenReturn(objectInputStream); + + final int totalWritten = new Random().nextInt(10_000) + 5_000; + final BufferAccumulator bufferAccumulator = mock(BufferAccumulator.class); + when(bufferAccumulator.getTotalWritten()).thenReturn(totalWritten); + try (final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, recordsToAccumulate, bufferTimeout)) + .thenReturn(bufferAccumulator); + createObjectUnderTest().parseS3Object(s3ObjectReference); + } + + verify(s3ObjectEventsSummary).record(totalWritten); + } + + @Test + void parseS3Object_records_S3_ObjectSize() throws IOException { + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(objectInputStream); + when(compressionEngine.createInputStream(key, objectInputStream)).thenReturn(objectInputStream); + + createObjectUnderTest().parseS3Object(s3ObjectReference); + + verify(s3ObjectSizeSummary).record(objectSize); + } + + @Test + void parseS3Object_records_input_stream_bytes_read() throws IOException { + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(objectInputStream); + final int inputStringLength = random.nextInt(1000) + 10; + final byte[] inputStreamBytes = new byte[inputStringLength]; + random.nextBytes(inputStreamBytes); + final InputStream compressionInputStream = new ByteArrayInputStream(inputStreamBytes); + when(compressionEngine.createInputStream(key, objectInputStream)).thenReturn(compressionInputStream); + doAnswer(a -> { + final InputStream inputStream = a.getArgument(0); + IOUtils.copy(inputStream, new ByteArrayOutputStream()); + return a; + }).when(codec).parse(any(InputStream.class), any(Consumer.class)); + + createObjectUnderTest().parseS3Object(s3ObjectReference); + + verify(s3ObjectSizeProcessedSummary).record(inputStringLength); + } } \ No newline at end of file