From 2ea59c601ccf6c55e4e2bb02aec773e29c4edca5 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Thu, 17 Aug 2023 17:18:45 -0500 Subject: [PATCH] Fix rebase conflicts Signed-off-by: Chase Engelbrecht --- .../dataprepper/plugins/sink/s3/S3SinkService.java | 4 ++-- .../dataprepper/plugins/sink/s3/S3SinkServiceTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java index e1b1591f3c..7007259ebf 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java @@ -144,7 +144,7 @@ private void flushToS3IfNeeded() { if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { try { codec.complete(currentBuffer.getOutputStream()); - final String s3Key = generateKey(codec); + String s3Key = currentBuffer.getKey(); LOG.info("Writing {} to S3 with {} events and size of {} bytes.", s3Key, currentBuffer.getEventCount(), currentBuffer.getSize()); final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key); @@ -160,7 +160,7 @@ private void flushToS3IfNeeded() { objectsFailedCounter.increment(); releaseEventHandles(false); } - currentBuffer = bufferFactory.getBuffer(); + currentBuffer = bufferFactory.getBuffer(s3Client, () -> bucket, keyGenerator::generateKey); } catch (final IOException e) { LOG.error("Exception while completing codec", e); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java index 744ea8cd32..cb311d178a 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceTest.java @@ -291,7 +291,7 @@ void test_output_with_no_incoming_records_flushes_batch() throws IOException { bufferFactory = mock(BufferFactory.class); Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer()).thenReturn(buffer); + when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); when(buffer.getEventCount()).thenReturn(10); final OutputStream outputStream = mock(OutputStream.class); @@ -301,7 +301,7 @@ void test_output_with_no_incoming_records_flushes_batch() throws IOException { s3SinkService.output(Collections.emptyList()); verify(snapshotSuccessCounter, times(1)).increment(); - verify(buffer, times(1)).flushToS3(any(), anyString(), anyString()); + verify(buffer, times(1)).flushToS3(); } @Test @@ -309,7 +309,7 @@ void test_output_with_no_incoming_records_or_buffered_records_short_circuits() t bufferFactory = mock(BufferFactory.class); Buffer buffer = mock(Buffer.class); - when(bufferFactory.getBuffer()).thenReturn(buffer); + when(bufferFactory.getBuffer(any(S3Client.class), any(), any())).thenReturn(buffer); when(buffer.getEventCount()).thenReturn(0); final long objectSize = random.nextInt(1_000_000) + 10_000; when(buffer.getSize()).thenReturn(objectSize); @@ -321,7 +321,7 @@ void test_output_with_no_incoming_records_or_buffered_records_short_circuits() t s3SinkService.output(Collections.emptyList()); verify(snapshotSuccessCounter, times(0)).increment(); - verify(buffer, times(0)).flushToS3(any(), anyString(), anyString()); + verify(buffer, times(0)).flushToS3(); } @Test