diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 32f48eba..26f3c03c 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -144,11 +143,9 @@ private List readNext() { return sourceRecords; } - try (Stream recordStream = transformer.getRecords(s3Object, topic, topicPartition, s3SourceConfig, numberOfRecsAlreadyProcessed)) { - final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8); final Iterator recordIterator = recordStream.iterator(); while (recordIterator.hasNext()) { final Object record = recordIterator.next(); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java index 35b0b5ea..10939c51 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java @@ -27,7 +27,6 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import com.amazonaws.regions.Regions; import org.junit.jupiter.api.Test; import software.amazon.awssdk.regions.Region; @@ -42,7 +41,7 @@ void correctFullConfig() { props.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket"); props.put(S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG, "AWS_S3_ENDPOINT"); props.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "AWS_S3_PREFIX"); - props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName()); + props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Region.US_EAST_1.id()); // record, topic specific props props.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java index 58c956d8..8b34f73d 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java @@ -32,7 +32,6 @@ import io.aiven.kafka.connect.common.config.CompressionType; -import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.github.luben.zstd.ZstdInputStream; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.slf4j.Logger; @@ -47,6 +46,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; public class BucketAccessor { @@ -79,11 +79,9 @@ public final void removeBucket() { .bucket(bucketName) .delete(Delete.builder().objects(deleteIds).build()) .build()); - } catch (final MultiObjectDeleteException e) { - for (final var err : e.getErrors()) { - LOGGER.warn(String.format("Couldn't delete object: %s. Reason: [%s] %s", err.getKey(), err.getCode(), - err.getMessage())); - } + } catch (final S3Exception e) { + LOGGER.warn( + String.format("Couldn't delete objects. Reason: [%s] %s", e.awsErrorDetails().errorMessage(), e)); } catch (final SdkException e) { LOGGER.error("Couldn't delete objects: {}, Exception{} ", deleteIds, e.getMessage()); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java deleted file mode 100644 index 4d33e46c..00000000 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright 2020 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.testutils; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.UploadPartRequest; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class S3OutputStream extends OutputStream { - - private final Logger logger = LoggerFactory.getLogger(S3OutputStream.class); - - public static final int DEFAULT_PART_SIZE = 5 * 1024 * 1024; // 5 MB - - private final AmazonS3 client; - - private final ByteBuffer byteBuffer; - - private final String bucketName; - - private final String key; - - private MultipartUpload multipartUpload; - - private final int partSize; - - private final String serverSideEncryptionAlgorithm; - - private boolean closed; - - @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "AmazonS3 client is mutable") - public S3OutputStream(final String bucketName, final String key, final int partSize, final AmazonS3 client) { - this(bucketName, key, partSize, client, null); - } - - @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "AmazonS3 client is mutable") - public S3OutputStream(final String bucketName, final String key, final int partSize, final AmazonS3 client, - final String serverSideEncryptionAlgorithm) { - super(); - this.bucketName = bucketName; - this.key = key; - this.client = client; - this.partSize = partSize; - this.byteBuffer = ByteBuffer.allocate(partSize); - this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; - } - - @Override - public void write(final int singleByte) throws IOException { - write(new byte[] { (byte) singleByte }, 0, 1); - } - - @Override - public void write(final byte[] bytes, final int off, final int len) throws IOException { - if (Objects.isNull(bytes) || bytes.length == 0) { - return; - } - if (Objects.isNull(multipartUpload)) { - multipartUpload = newMultipartUpload(); - } - final var source = ByteBuffer.wrap(bytes, off, len); - while (source.hasRemaining()) { - final var transferred = Math.min(byteBuffer.remaining(), source.remaining()); - final var offset = source.arrayOffset() + source.position(); - byteBuffer.put(source.array(), offset, transferred); - source.position(source.position() + transferred); - if (!byteBuffer.hasRemaining()) { - flushBuffer(0, partSize, partSize); - } - } - } - - private MultipartUpload newMultipartUpload() throws IOException { - logger.debug("Create new multipart upload request"); - final var initialRequest = new InitiateMultipartUploadRequest(bucketName, key); - initialRequest.setObjectMetadata(this.buildObjectMetadata()); - final var initiateResult = client.initiateMultipartUpload(initialRequest); - logger.debug("Upload ID: {}", initiateResult.getUploadId()); - return new MultipartUpload(initiateResult.getUploadId()); - } - - private ObjectMetadata buildObjectMetadata() { - final ObjectMetadata metadata = new ObjectMetadata(); - - if (this.serverSideEncryptionAlgorithm != null) { - metadata.setSSEAlgorithm(this.serverSideEncryptionAlgorithm); - } - - return metadata; - } - - @Override - public void close() throws IOException { - if (closed) { - return; - } - if (byteBuffer.position() > 0 && Objects.nonNull(multipartUpload)) { - flushBuffer(byteBuffer.arrayOffset(), byteBuffer.position(), byteBuffer.position()); - } - if (Objects.nonNull(multipartUpload)) { - multipartUpload.complete(); - multipartUpload = null; // NOPMD NullAssignment - } - closed = true; - super.close(); - } - - private void flushBuffer(final int offset, final int length, final int partSize) throws IOException { - try { - multipartUpload.uploadPart(new ByteArrayInputStream(byteBuffer.array(), offset, length), partSize); - byteBuffer.clear(); - } catch (final Exception e) { // NOPMD AvoidCatchingGenericException - multipartUpload.abort(); - multipartUpload = null; // NOPMD NullAssignment - throw new IOException(e); - } - } - - private class MultipartUpload { - - private final String uploadId; - - private final List partETags = new ArrayList<>(); - - public MultipartUpload(final String uploadId) { - this.uploadId = uploadId; - } - - public void uploadPart(final InputStream inputStream, final int partSize) throws IOException { - final var partNumber = partETags.size() + 1; - final var uploadPartRequest = new UploadPartRequest().withBucketName(bucketName) - .withKey(key) - .withUploadId(uploadId) - .withPartSize(partSize) - .withPartNumber(partNumber) - .withInputStream(inputStream); - final var uploadResult = client.uploadPart(uploadPartRequest); - partETags.add(uploadResult.getPartETag()); - } - - public void complete() { - client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags)); - } - - public void abort() { - client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, uploadId)); - } - - } - -} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 980b4435..b701ea85 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.Collections; import java.util.stream.Stream; @@ -38,7 +39,6 @@ import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,8 +64,7 @@ void testIteratorProcessesS3Objects() throws Exception { final String key = "topic-00001-abc123.txt"; // Mock InputStream - try (S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { + try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) @@ -96,8 +95,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { final String key = "topic-00001-abc123.txt"; // Mock InputStream - try (S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { + try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); // With ByteArrayTransformer