From f8bf46ef3ebeea94840aca39cd86687b6d336229 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Tue, 17 Dec 2024 08:59:53 +0000 Subject: [PATCH 1/5] Update to move existing aws integration to the AWS SDK 2.X which has long term support. Signed-off-by: Aindriu Lavelle --- s3-commons/build.gradle.kts | 3 + .../connect/config/s3/S3ConfigFragment.java | 41 +++++++- .../iam/AwsCredentialProviderFactory.java | 34 +++++++ s3-source-connector/build.gradle.kts | 10 +- .../connect/s3/source/IntegrationBase.java | 24 ++--- .../connect/s3/source/IntegrationTest.java | 16 ++-- .../kafka/connect/s3/source/S3SourceTask.java | 10 +- .../s3/source/config/S3ClientFactory.java | 59 ++++++------ .../s3/source/config/S3SourceConfig.java | 20 +--- .../s3/source/utils/AWSV2SourceClient.java | 66 +++++++------ .../s3/source/utils/SourceRecordIterator.java | 37 ++++---- .../connect/s3/source/S3SourceTaskTest.java | 53 +++++++---- .../s3/source/config/S3SourceConfigTest.java | 8 +- .../s3/source/testutils/BucketAccessor.java | 61 +++++++----- .../source/utils/AWSV2SourceClientTest.java | 95 +++++++++---------- .../utils/SourceRecordIteratorTest.java | 21 ++-- 16 files changed, 329 insertions(+), 229 deletions(-) diff --git a/s3-commons/build.gradle.kts b/s3-commons/build.gradle.kts index 0e3d825aa..5e54c05ef 100644 --- a/s3-commons/build.gradle.kts +++ b/s3-commons/build.gradle.kts @@ -18,10 +18,13 @@ plugins { id("aiven-apache-kafka-connectors-all.java-conventions") } val amazonS3Version by extra("1.12.777") val amazonSTSVersion by extra("1.12.777") +val amazonV2Version by extra("2.29.34") dependencies { implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version") implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion") + implementation("software.amazon.awssdk:auth:$amazonV2Version") + implementation("software.amazon.awssdk:sts:$amazonV2Version") implementation(project(":commons")) diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 1e87265b9..672409565 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -41,11 +41,13 @@ import com.amazonaws.services.s3.internal.BucketNameUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; /** * The configuration fragment that defines the S3 specific characteristics. */ -@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.TooManyStaticImports" }) +@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.TooManyStaticImports", "PMD.GodClass" }) public final class S3ConfigFragment extends ConfigFragment { private static final Logger LOGGER = LoggerFactory.getLogger(S3ConfigFragment.class); @@ -345,7 +347,8 @@ public void validateCredentials() { } } else { final BasicAWSCredentials awsCredentials = getAwsCredentials(); - if (awsCredentials == null) { + final AwsBasicCredentials awsV2Credentials = getAwsV2Credentials(); + if (awsCredentials == null && awsV2Credentials == null) { LOGGER.info( "Connector use {} as credential Provider, " + "when configuration for {{}, {}} OR {{}, {}} are absent", @@ -410,11 +413,13 @@ public AwsStsEndpointConfig getStsEndpointConfig() { return new AwsStsEndpointConfig(cfg.getString(AWS_STS_CONFIG_ENDPOINT), cfg.getString(AWS_S3_REGION_CONFIG)); } + @Deprecated public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() { final AwsStsEndpointConfig config = getStsEndpointConfig(); return new AwsClientBuilder.EndpointConfiguration(config.getServiceEndpoint(), config.getSigningRegion()); } + @Deprecated public BasicAWSCredentials getAwsCredentials() { if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG)) && Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) { @@ -430,12 +435,26 @@ public BasicAWSCredentials getAwsCredentials() { return null; } + public AwsBasicCredentials getAwsV2Credentials() { + if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG)) + && Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) { + + return AwsBasicCredentials.create(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(), + cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value()); + } else if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID)) + && Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY))) { + LOGGER.warn("Config options {} and {} are not supported for this Connector", AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY); + } + return null; + } + public String getAwsS3EndPoint() { return Objects.nonNull(cfg.getString(AWS_S3_ENDPOINT_CONFIG)) ? cfg.getString(AWS_S3_ENDPOINT_CONFIG) : cfg.getString(AWS_S3_ENDPOINT); } - + @Deprecated public Region getAwsS3Region() { // we have priority of properties if old one not set or both old and new one set // the new property value will be selected @@ -448,6 +467,18 @@ public Region getAwsS3Region() { } } + public software.amazon.awssdk.regions.Region getAwsV2S3Region() { + // we have priority of properties if old one not set or both old and new one set + // the new property value will be selected + if (Objects.nonNull(cfg.getString(AWS_S3_REGION_CONFIG))) { + return software.amazon.awssdk.regions.Region.of(cfg.getString(AWS_S3_REGION_CONFIG)); + } else if (Objects.nonNull(cfg.getString(AWS_S3_REGION))) { + return software.amazon.awssdk.regions.Region.of(cfg.getString(AWS_S3_REGION)); + } else { + return software.amazon.awssdk.regions.Region.of(Regions.US_EAST_1.getName()); + } + } + public String getAwsS3BucketName() { return Objects.nonNull(cfg.getString(AWS_S3_BUCKET_NAME_CONFIG)) ? cfg.getString(AWS_S3_BUCKET_NAME_CONFIG) @@ -484,6 +515,10 @@ public AWSCredentialsProvider getCustomCredentialsProvider() { return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class); } + public AwsCredentialsProvider getCustomV2CredentialsProvider() { + return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AwsCredentialsProvider.class); + } + public int getFetchPageSize() { return cfg.getInt(FETCH_PAGE_SIZE); } diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java index 2a5089726..52e7d4b91 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java @@ -26,6 +26,11 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; public class AwsCredentialProviderFactory { @@ -58,4 +63,33 @@ private AWSSecurityTokenService securityTokenService(final S3ConfigFragment conf } return AWSSecurityTokenServiceClientBuilder.defaultClient(); } + + public AwsCredentialsProvider getAwsV2Provider(final S3ConfigFragment config) { + + if (config.hasAwsStsRole()) { + return getV2StsProvider(config); + } + final AwsBasicCredentials awsCredentials = config.getAwsV2Credentials(); + if (Objects.isNull(awsCredentials)) { + return config.getCustomV2CredentialsProvider(); + } + return StaticCredentialsProvider.create(awsCredentials); + + } + + private StsAssumeRoleCredentialsProvider getV2StsProvider(final S3ConfigFragment config) { + if (config.hasAwsStsRole()) { + return StsAssumeRoleCredentialsProvider.builder() + .refreshRequest(() -> AssumeRoleRequest.builder() + .roleArn(config.getStsRole().getArn()) + // Maker this a unique identifier + .roleSessionName("AwsV2SDKConnectorSession") + .build()) + .build(); + } + + return StsAssumeRoleCredentialsProvider.builder().build(); + + } + } diff --git a/s3-source-connector/build.gradle.kts b/s3-source-connector/build.gradle.kts index 3530724e0..cfa5f6514 100644 --- a/s3-source-connector/build.gradle.kts +++ b/s3-source-connector/build.gradle.kts @@ -18,8 +18,8 @@ import com.github.spotbugs.snom.SpotBugsTask plugins { id("aiven-apache-kafka-connectors-all.java-conventions") } -val amazonS3Version by extra("1.12.729") -val amazonSTSVersion by extra("1.12.729") +val amazonS3Version by extra("2.29.34") +val amazonSTSVersion by extra("2.29.34") val s3mockVersion by extra("0.2.6") val testKafkaVersion by extra("3.7.1") @@ -67,8 +67,9 @@ dependencies { implementation(project(":commons")) implementation(project(":s3-commons")) - implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version") - implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion") + implementation("software.amazon.awssdk:s3:$amazonS3Version") + implementation("software.amazon.awssdk:sts:$amazonSTSVersion") + testImplementation("software.amazon.awssdk:url-connection-client:$amazonSTSVersion") implementation(tools.spotbugs.annotations) implementation(logginglibs.slf4j) @@ -154,7 +155,6 @@ dependencies { exclude(group = "org.apache.commons", module = "commons-math3") exclude(group = "org.apache.httpcomponents", module = "httpclient") exclude(group = "commons-codec", module = "commons-codec") - exclude(group = "commons-io", module = "commons-io") exclude(group = "commons-net", module = "commons-net") exclude(group = "org.eclipse.jetty") exclude(group = "org.eclipse.jetty.websocket") diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 9ce09172b..6b505b996 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.net.ServerSocket; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; @@ -47,11 +48,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.json.JsonDeserializer; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -61,6 +57,10 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; public interface IntegrationBase { String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/"; @@ -101,13 +101,13 @@ static void waitForRunningContainer(final Container container) { await().atMost(Duration.ofMinutes(1)).until(container::isRunning); } - static AmazonS3 createS3Client(final LocalStackContainer localStackContainer) { - return AmazonS3ClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString(), - localStackContainer.getRegion())) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials( - localStackContainer.getAccessKey(), localStackContainer.getSecretKey()))) + static S3Client createS3Client(final LocalStackContainer localStackContainer) { + return S3Client.builder() + .endpointOverride( + URI.create(localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .region(Region.of(localStackContainer.getRegion())) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials + .create(localStackContainer.getAccessKey(), localStackContainer.getSecretKey()))) .build(); } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 7f96842f3..884051e30 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -33,7 +33,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -62,9 +61,6 @@ import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.testutils.ContentUtils; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; import com.fasterxml.jackson.databind.JsonNode; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; @@ -83,6 +79,9 @@ import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; @Testcontainers @SuppressWarnings("PMD.ExcessiveImports") @@ -111,7 +110,7 @@ final class IntegrationTest implements IntegrationBase { private AdminClient adminClient; private ConnectRunner connectRunner; - private static AmazonS3 s3Client; + private static S3Client s3Client; @BeforeAll static void setUpAll() throws IOException, InterruptedException { @@ -263,7 +262,7 @@ void parquetTest(final TestInfo testInfo) throws IOException { final Path path = ContentUtils.getTmpFilePath(name); try { - s3Client.putObject(TEST_BUCKET_NAME, fileName, Files.newInputStream(path), null); + s3Client.putObject(PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(fileName).build(), path); } catch (final Exception e) { // NOPMD broad exception caught LOGGER.error("Error in reading file {}", e.getMessage(), e); } finally { @@ -341,9 +340,8 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) { final String objectKey = addPrefixOrDefault("") + topicName + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt"; - final PutObjectRequest request = new PutObjectRequest(TEST_BUCKET_NAME, objectKey, - new ByteArrayInputStream(testDataBytes), new ObjectMetadata()); - s3Client.putObject(request); + final PutObjectRequest request = PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(objectKey).build(); + s3Client.putObject(request, RequestBody.fromBytes(testDataBytes)); return OBJECT_KEY + SEPARATOR + objectKey; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index aa331b4aa..320fa19cb 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -40,10 +40,10 @@ import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; import io.aiven.kafka.connect.s3.source.utils.Version; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.AmazonS3Exception; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3Client; /** * S3SourceTask is a Kafka Connect SourceTask implementation that reads from source-s3 buckets and generates Kafka @@ -64,7 +64,7 @@ public class S3SourceTask extends SourceTask { private static final long ERROR_BACKOFF = 1000L; private S3SourceConfig s3SourceConfig; - private AmazonS3 s3Client; + private S3Client s3Client; private Iterator sourceRecordIterator; private Transformer transformer; @@ -122,8 +122,8 @@ public List poll() throws InterruptedException { extractSourceRecords(results); LOGGER.info("Number of records extracted and sent: {}", results.size()); return results; - } catch (AmazonS3Exception exception) { - if (exception.isRetryable()) { + } catch (SdkException exception) { + if (exception.retryable()) { LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...", exception); pollLock.wait(ERROR_BACKOFF); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java index 346ec5825..31cd6191c 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java @@ -16,45 +16,48 @@ package io.aiven.kafka.connect.s3.source.config; +import java.net.URI; import java.util.Objects; import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; -import com.amazonaws.PredefinedClientConfigurations; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.retry.PredefinedBackoffStrategies; -import com.amazonaws.retry.PredefinedRetryPolicies; -import com.amazonaws.retry.RetryPolicy; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; public class S3ClientFactory { private final AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); - public AmazonS3 createAmazonS3Client(final S3SourceConfig config) { - final var awsEndpointConfig = newEndpointConfiguration(config); - final var clientConfig = PredefinedClientConfigurations.defaultConfig() - .withRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, - new PredefinedBackoffStrategies.FullJitterBackoffStrategy( - Math.toIntExact(config.getS3RetryBackoffDelayMs()), - Math.toIntExact(config.getS3RetryBackoffMaxDelayMs())), - config.getS3RetryBackoffMaxRetries(), false)); - final var s3ClientBuilder = AmazonS3ClientBuilder.standard() - .withCredentials(credentialFactory.getProvider(config.getS3ConfigFragment())) - .withClientConfiguration(clientConfig); - if (Objects.isNull(awsEndpointConfig)) { - s3ClientBuilder.withRegion(config.getAwsS3Region().getName()); - } else { - s3ClientBuilder.withEndpointConfiguration(awsEndpointConfig).withPathStyleAccessEnabled(true); - } - return s3ClientBuilder.build(); - } + public S3Client createAmazonS3Client(final S3SourceConfig config) { + + // EndpointConfiguration is no longer used in SDK 2.X + // TODO Review back off strategy + // final BackoffStrategy backoffStrategy = + // BackoffStrategy.exponentialDelayWithoutJitter(Duration.ofMillis(Math.toIntExact(config.getS3RetryBackoffDelayMs())), + // Duration.ofMillis(Math.toIntExact(config.getS3RetryBackoffMaxDelayMs()))); - private AwsClientBuilder.EndpointConfiguration newEndpointConfiguration(final S3SourceConfig config) { + final ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder() + .retryStrategy(RetryMode.STANDARD) + .build(); if (Objects.isNull(config.getAwsS3EndPoint())) { - return null; + return S3Client.builder() + .overrideConfiguration(clientOverrideConfiguration) + .region(config.getAwsS3Region()) + .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) + .build(); + } else { + // TODO This is definitely used for testing but not sure if customers use it. + return S3Client.builder() + .overrideConfiguration(clientOverrideConfiguration) + .region(config.getAwsS3Region()) + .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) + .endpointOverride(URI.create(config.getAwsS3EndPoint())) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .build(); } - return new AwsClientBuilder.EndpointConfiguration(config.getAwsS3EndPoint(), config.getAwsS3Region().getName()); + } + } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index 68b9b2f98..d92f448ce 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -32,12 +32,10 @@ import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; import io.aiven.kafka.connect.iam.AwsStsRole; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.regions.Region; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.regions.Region; final public class S3SourceConfig extends SourceCommonConfig { @@ -87,12 +85,8 @@ public AwsStsEndpointConfig getStsEndpointConfig() { return s3ConfigFragment.getStsEndpointConfig(); } - public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() { - return s3ConfigFragment.getAwsEndpointConfiguration(); - } - - public BasicAWSCredentials getAwsCredentials() { - return s3ConfigFragment.getAwsCredentials(); + public AwsBasicCredentials getAwsCredentials() { + return s3ConfigFragment.getAwsV2Credentials(); } public String getAwsS3EndPoint() { @@ -100,7 +94,7 @@ public String getAwsS3EndPoint() { } public Region getAwsS3Region() { - return s3ConfigFragment.getAwsS3Region(); + return s3ConfigFragment.getAwsV2S3Region(); } public String getAwsS3BucketName() { @@ -131,10 +125,6 @@ public int getS3RetryBackoffMaxRetries() { return s3ConfigFragment.getS3RetryBackoffMaxRetries(); } - public AWSCredentialsProvider getCustomCredentialsProvider() { - return s3ConfigFragment.getCustomCredentialsProvider(); - } - public S3ConfigFragment getS3ConfigFragment() { return s3ConfigFragment; } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index 1bbc477ee..377d7d949 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.s3.source.utils; +import java.io.InputStream; import java.util.HashSet; import java.util.Iterator; import java.util.Objects; @@ -26,11 +27,14 @@ import io.aiven.kafka.connect.s3.source.config.S3ClientFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.commons.io.function.IOSupplier; import org.codehaus.plexus.util.StringUtils; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.S3Object; /** * Called AWSV2SourceClient as this source client implements the V2 version of the aws client library. Handles all calls @@ -40,10 +44,10 @@ public class AWSV2SourceClient { public static final int PAGE_SIZE_FACTOR = 2; private final S3SourceConfig s3SourceConfig; - private final AmazonS3 s3Client; + private final S3Client s3Client; private final String bucketName; - private Predicate filterPredicate = summary -> summary.getSize() > 0; + private Predicate filterPredicate = s3Object -> s3Object.size() > 0; private final Set failedObjectKeys; /** @@ -70,7 +74,7 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set * @param failedObjectKeys * all objectKeys which have already been tried but have been unable to process. */ - AWSV2SourceClient(final AmazonS3 s3Client, final S3SourceConfig s3SourceConfig, + AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, final Set failedObjectKeys) { this.s3SourceConfig = s3SourceConfig; this.s3Client = s3Client; @@ -79,46 +83,52 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set } public Iterator getListOfObjectKeys(final String startToken) { - final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName) - .withMaxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR); - - if (StringUtils.isNotBlank(startToken)) { - request.withStartAfter(startToken); - } - // Prefix is optional so only use if supplied - if (StringUtils.isNotBlank(s3SourceConfig.getAwsS3Prefix())) { - request.withPrefix(s3SourceConfig.getAwsS3Prefix()); - } + final ListObjectsV2Request request = ListObjectsV2Request.builder() + .bucket(bucketName) + .maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR) + .prefix(optionalKey(s3SourceConfig.getAwsS3Prefix())) + .startAfter(optionalKey(startToken)) + .build(); final Stream s3ObjectKeyStream = Stream .iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> { // This is called every time next() is called on the iterator. if (response.isTruncated()) { - return s3Client.listObjectsV2( - new ListObjectsV2Request().withContinuationToken(response.getNextContinuationToken())); + return s3Client.listObjectsV2(ListObjectsV2Request.builder() + .maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR) + .continuationToken(response.nextContinuationToken()) + .build()); } else { return null; } }) - .flatMap(response -> response.getObjectSummaries() + .flatMap(response -> response.contents() .stream() .filter(filterPredicate) - .filter(objectSummary -> assignObjectToTask(objectSummary.getKey())) - .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))) - .map(S3ObjectSummary::getKey); + .filter(objectSummary -> assignObjectToTask(objectSummary.key())) + .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.key()))) + .map(S3Object::key); return s3ObjectKeyStream.iterator(); } - - public S3Object getObject(final String objectKey) { - return s3Client.getObject(bucketName, objectKey); + private String optionalKey(final String key) { + if (StringUtils.isNotBlank(key)) { + return key; + } + return null; + } + // TODO is the response closeable or autocloseable + public IOSupplier getObject(final String objectKey) { + final GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucketName).key(objectKey).build(); + final ResponseBytes s3ObjectResponse = s3Client.getObjectAsBytes(getObjectRequest); + return s3ObjectResponse::asInputStream; } public void addFailedObjectKeys(final String objectKey) { this.failedObjectKeys.add(objectKey); } - public void setFilterPredicate(final Predicate predicate) { + public void setFilterPredicate(final Predicate predicate) { filterPredicate = predicate; } @@ -130,7 +140,7 @@ private boolean assignObjectToTask(final String objectKey) { } public void shutdown() { - s3Client.shutdown(); + s3Client.close(); } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index ac5a3061a..32f48eba4 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -17,6 +17,8 @@ package io.aiven.kafka.connect.s3.source.utils; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -32,10 +34,10 @@ import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.model.S3Object; +import org.apache.commons.io.function.IOSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; /** * Iterator that processes S3 files and creates Kafka source records. Supports different output formats (Avro, JSON, @@ -91,7 +93,7 @@ private void nextS3Object() { recordIterator = createIteratorForCurrentFile(); } } catch (IOException e) { - throw new AmazonClientException(e); + throw SdkException.create(e.getMessage(), e.getCause()); } } @@ -103,20 +105,20 @@ private Iterator createIteratorForCurrentFile() throws IOExcepti if (fileMatcher.find()) { // TODO move this from the SourceRecordIterator so that we can decouple it from S3 and make it API agnostic - try (S3Object s3Object = sourceClient.getObject(currentObjectKey);) { - topicName = fileMatcher.group(PATTERN_TOPIC_KEY); - defaultPartitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); + final IOSupplier s3Object = sourceClient.getObject(currentObjectKey); + topicName = fileMatcher.group(PATTERN_TOPIC_KEY); + defaultPartitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); - final long defaultStartOffsetId = 1L; + final long defaultStartOffsetId = 1L; - final String finalTopic = topicName; - final Map partitionMap = ConnectUtils.getPartitionMap(topicName, defaultPartitionId, - bucketName); + final String finalTopic = topicName; + final Map partitionMap = ConnectUtils.getPartitionMap(topicName, defaultPartitionId, + bucketName); + + return getObjectIterator(s3Object, finalTopic, defaultPartitionId, defaultStartOffsetId, transformer, + partitionMap); - return getObjectIterator(s3Object, finalTopic, defaultPartitionId, defaultStartOffsetId, transformer, - partitionMap); - } } else { LOGGER.error("File naming doesn't match to any topic. {}", currentObjectKey); return Collections.emptyIterator(); @@ -124,7 +126,7 @@ private Iterator createIteratorForCurrentFile() throws IOExcepti } @SuppressWarnings("PMD.CognitiveComplexity") - private Iterator getObjectIterator(final S3Object s3Object, final String topic, + private Iterator getObjectIterator(final IOSupplier s3Object, final String topic, final int topicPartition, final long startOffset, final Transformer transformer, final Map partitionMap) { return new Iterator<>() { @@ -142,8 +144,11 @@ private List readNext() { return sourceRecords; } - try (Stream recordStream = transformer.getRecords(s3Object::getObjectContent, topic, - topicPartition, s3SourceConfig, numberOfRecsAlreadyProcessed)) { + + try (Stream recordStream = transformer.getRecords(s3Object, topic, topicPartition, + s3SourceConfig, numberOfRecsAlreadyProcessed)) { + + final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8); final Iterator recordIterator = recordStream.iterator(); while (recordIterator.hasNext()) { final Object record = recordIterator.next(); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index 590ad23bb..b4bb918ac 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -22,8 +22,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES; import java.lang.reflect.Field; +import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,15 +42,12 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; +import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; +import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import io.findify.s3mock.S3Mock; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -57,6 +57,12 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.utils.AttributeMap; @ExtendWith(MockitoExtension.class) final class S3SourceTaskTest { @@ -66,9 +72,10 @@ final class S3SourceTaskTest { private static BucketAccessor testBucketAccessor; private static final String TEST_BUCKET = "test-bucket"; - + // TODO S3Mock has not been maintained in 4 years + // Adobe have an alternative we can move to. private static S3Mock s3Api; - private static AmazonS3 s3Client; + private static S3Client s3Client; private static Map commonProperties; @@ -79,7 +86,7 @@ final class S3SourceTaskTest { private OffsetStorageReader mockedOffsetStorageReader; @BeforeAll - public static void setUpClass() { + public static void setUpClass() throws URISyntaxException { final int s3Port = RANDOM.nextInt(10_000) + 10_000; s3Api = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build(); @@ -90,17 +97,21 @@ public static void setUpClass() { S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET, S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG, "http://localhost:" + s3Port, S3ConfigFragment.AWS_S3_REGION_CONFIG, "us-west-2"); - final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); - final BasicAWSCredentials awsCreds = new BasicAWSCredentials( - commonProperties.get(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG), - commonProperties.get(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG)); - builder.withCredentials(new AWSStaticCredentialsProvider(awsCreds)); - builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - commonProperties.get(S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG), - commonProperties.get(S3ConfigFragment.AWS_S3_REGION_CONFIG))); - builder.withPathStyleAccessEnabled(true); - - s3Client = builder.build(); + final AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); + final S3SourceConfig config = new S3SourceConfig(commonProperties); + final ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder() + .retryStrategy(RetryMode.STANDARD) + .build(); + + s3Client = S3Client.builder() + .overrideConfiguration(clientOverrideConfiguration) + .region(config.getAwsS3Region()) + .endpointOverride(URI.create(config.getAwsS3EndPoint())) + .httpClient(UrlConnectionHttpClient.builder() + .buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, Boolean.TRUE).build())) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) + .build(); testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET); testBucketAccessor.createBucket(); @@ -114,14 +125,14 @@ public static void tearDownClass() { @BeforeEach public void setUp() { properties = new HashMap<>(commonProperties); - s3Client.createBucket(TEST_BUCKET); + s3Client.createBucket(create -> create.bucket(TEST_BUCKET).build()); mockedSourceTaskContext = mock(SourceTaskContext.class); mockedOffsetStorageReader = mock(OffsetStorageReader.class); } @AfterEach public void tearDown() { - s3Client.deleteBucket(TEST_BUCKET); + s3Client.deleteBucket(delete -> delete.bucket(TEST_BUCKET).build()); } @Test diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java index edbe8dc98..35b0b5ead 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java @@ -27,9 +27,9 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; final class S3SourceConfigTest { @Test @@ -53,11 +53,11 @@ void correctFullConfig() { final var conf = new S3SourceConfig(props); final var awsCredentials = conf.getAwsCredentials(); - assertThat(awsCredentials.getAWSAccessKeyId()).isEqualTo("AWS_ACCESS_KEY_ID"); - assertThat(awsCredentials.getAWSSecretKey()).isEqualTo("AWS_SECRET_ACCESS_KEY"); + assertThat(awsCredentials.accessKeyId()).isEqualTo("AWS_ACCESS_KEY_ID"); + assertThat(awsCredentials.secretAccessKey()).isEqualTo("AWS_SECRET_ACCESS_KEY"); assertThat(conf.getAwsS3BucketName()).isEqualTo("the-bucket"); assertThat(conf.getAwsS3EndPoint()).isEqualTo("AWS_S3_ENDPOINT"); - assertThat(conf.getAwsS3Region()).isEqualTo(RegionUtils.getRegion("us-east-1")); + assertThat(conf.getAwsS3Region()).isEqualTo(Region.of("us-east-1")); assertThat(conf.getInputFormat()).isEqualTo(InputFormat.AVRO); assertThat(conf.getTargetTopics()).isEqualTo("testtopic"); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java index 212088560..58c956d87 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java @@ -32,58 +32,73 @@ import io.aiven.kafka.connect.common.config.CompressionType; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.MultiObjectDeleteException; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.github.luben.zstd.ZstdInputStream; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xerial.snappy.SnappyInputStream; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Object; public class BucketAccessor { private final String bucketName; - private final AmazonS3 s3Client; + private final S3Client s3Client; private static final Logger LOGGER = LoggerFactory.getLogger(BucketAccessor.class); @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "stores mutable s3Client object") - public BucketAccessor(final AmazonS3 s3Client, final String bucketName) { + public BucketAccessor(final S3Client s3Client, final String bucketName) { this.bucketName = bucketName; this.s3Client = s3Client; } public final void createBucket() { - s3Client.createBucket(bucketName); + s3Client.createBucket(builder -> builder.bucket(bucketName).build()); } public final void removeBucket() { - final var chunk = s3Client.listObjects(bucketName) - .getObjectSummaries() + final var deleteIds = s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucketName).build()) + .contents() .stream() - .map(S3ObjectSummary::getKey) - .toArray(String[]::new); + .map(S3Object::key) + .map(key -> ObjectIdentifier.builder().key(key).build()) + .collect(Collectors.toList()); - final var deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(chunk); try { - s3Client.deleteObjects(deleteObjectsRequest); + s3Client.deleteObjects(DeleteObjectsRequest.builder() + .bucket(bucketName) + .delete(Delete.builder().objects(deleteIds).build()) + .build()); } catch (final MultiObjectDeleteException e) { for (final var err : e.getErrors()) { LOGGER.warn(String.format("Couldn't delete object: %s. Reason: [%s] %s", err.getKey(), err.getCode(), err.getMessage())); } - } catch (final AmazonClientException e) { - LOGGER.error("Couldn't delete objects: {}", - Arrays.stream(chunk).reduce(" ", String::concat) + e.getMessage()); + } catch (final SdkException e) { + + LOGGER.error("Couldn't delete objects: {}, Exception{} ", deleteIds, e.getMessage()); } - s3Client.deleteBucket(bucketName); + s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()); } + // TODO NOT Currently used public final Boolean doesObjectExist(final String objectName) { - return s3Client.doesObjectExist(bucketName, objectName); + try { + s3Client.headObject(HeadObjectRequest.builder().bucket(bucketName).key(objectName).build()); + return true; + } catch (NoSuchKeyException e) { + return false; + } } public final List> readAndDecodeLines(final String blobName, final String compression, @@ -104,7 +119,8 @@ private List> readAndDecodeLines0(final String blobName, final Stri public final byte[] readBytes(final String blobName, final String compression) throws IOException { Objects.requireNonNull(blobName, "blobName cannot be null"); - final byte[] blobBytes = s3Client.getObject(bucketName, blobName).getObjectContent().readAllBytes(); + final byte[] blobBytes = s3Client.getObjectAsBytes(builder -> builder.key(blobName).bucket(bucketName).build()) + .asByteArray(); try (ByteArrayInputStream bais = new ByteArrayInputStream(blobBytes); InputStream decompressedStream = getDecompressedStream(bais, compression); ByteArrayOutputStream decompressedBytes = new ByteArrayOutputStream()) { @@ -135,10 +151,11 @@ public final List readLines(final String blobName, final String compress } public final List listObjects() { - return s3Client.listObjects(bucketName) - .getObjectSummaries() + + return s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucketName).build()) + .contents() .stream() - .map(S3ObjectSummary::getKey) + .map(S3Object::key) .collect(Collectors.toList()); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index a8174a15c..beed0681c 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -34,19 +34,19 @@ import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.model.S3ObjectSummary; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; class AWSV2SourceClientTest { - private AmazonS3 s3Client; + private S3Client s3Client; private AWSV2SourceClient awsv2SourceClient; @@ -66,8 +66,8 @@ private static Map getConfigMap(final int maxTasks, final int ta @CsvSource({ "3, 1" }) void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) { initializeWithTaskConfigs(maxTasks, taskId); - final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result(Collections.emptyList(), null); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); + final ListObjectsV2Response listObjectsV2Response = createListObjectsV2Response(Collections.emptyList(), null); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); assertThat(summaries).isExhausted(); @@ -107,8 +107,8 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final @CsvSource({ "4, 3", "4, 0" }) void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int taskId) { initializeWithTaskConfigs(maxTasks, taskId); - final ListObjectsV2Result listObjectsV2Result = getListObjectsV2Result(); - when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); + final ListObjectsV2Response listObjectsV2Response = getListObjectsV2Response(); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); @@ -121,13 +121,13 @@ void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int ta @Test void testFetchObjectSummariesWithPagination() throws IOException { initializeWithTaskConfigs(4, 3); - final S3ObjectSummary object1 = createObjectSummary(1, "key1"); - final S3ObjectSummary object2 = createObjectSummary(2, "key2"); - final List firstBatch = List.of(object1); - final List secondBatch = List.of(object2); + final S3Object object1 = createObjectSummary(1, "key1"); + final S3Object object2 = createObjectSummary(2, "key2"); + final List firstBatch = List.of(object1); + final List secondBatch = List.of(object2); - final ListObjectsV2Result firstResult = createListObjectsV2Result(firstBatch, "nextToken"); - final ListObjectsV2Result secondResult = createListObjectsV2Result(secondBatch, null); + final ListObjectsV2Response firstResult = createListObjectsV2Response(firstBatch, "nextToken"); + final ListObjectsV2Response secondResult = createListObjectsV2Response(secondBatch, null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); @@ -142,14 +142,14 @@ void testFetchObjectWithPrefix() { final Map configMap = getConfigMap(1, 0); configMap.put(AWS_S3_PREFIX_CONFIG, "test/"); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); - s3Client = mock(AmazonS3.class); + s3Client = mock(S3Client.class); awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); - final S3ObjectSummary object1 = createObjectSummary(1, "key1"); - final S3ObjectSummary object2 = createObjectSummary(1, "key2"); + final S3Object object1 = createObjectSummary(1, "key1"); + final S3Object object2 = createObjectSummary(1, "key2"); - final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken"); - final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null); + final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken"); + final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); @@ -163,10 +163,10 @@ void testFetchObjectWithPrefix() { final List allRequests = requestCaptor.getAllValues(); assertThat(summaries).isExhausted(); - assertThat(allRequests.get(0).getPrefix()).isEqualTo(s3SourceConfig.getAwsS3Prefix()); + assertThat(allRequests.get(0).prefix()).isEqualTo(s3SourceConfig.getAwsS3Prefix()); // Not required with continuation token - assertThat(allRequests.get(1).getPrefix()).isNull(); - assertThat(allRequests.get(1).getContinuationToken()).isEqualTo("nextToken"); + assertThat(allRequests.get(1).prefix()).isNull(); + assertThat(allRequests.get(1).continuationToken()).isEqualTo("nextToken"); } @@ -175,14 +175,14 @@ void testFetchObjectWithInitialStartAfter() { final Map configMap = getConfigMap(1, 0); final String startAfter = "file-option-1-12000.txt"; final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); - s3Client = mock(AmazonS3.class); + s3Client = mock(S3Client.class); awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); - final S3ObjectSummary object1 = createObjectSummary(1, "key1"); - final S3ObjectSummary object2 = createObjectSummary(1, "key2"); + final S3Object object1 = createObjectSummary(1, "key1"); + final S3Object object2 = createObjectSummary(1, "key2"); - final ListObjectsV2Result firstResult = createListObjectsV2Result(List.of(object1), "nextToken"); - final ListObjectsV2Result secondResult = createListObjectsV2Result(List.of(object2), null); + final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken"); + final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(firstResult).thenReturn(secondResult); @@ -196,32 +196,31 @@ void testFetchObjectWithInitialStartAfter() { final List allRequests = requestCaptor.getAllValues(); assertThat(summaries).isExhausted(); - assertThat(allRequests.get(0).getStartAfter()).isEqualTo(startAfter); + assertThat(allRequests.get(0).startAfter()).isEqualTo(startAfter); // Not required with continuation token - assertThat(allRequests.get(1).getStartAfter()).isNull(); - assertThat(allRequests.get(1).getContinuationToken()).isEqualTo("nextToken"); + assertThat(allRequests.get(1).startAfter()).isNull(); + assertThat(allRequests.get(1).continuationToken()).isEqualTo("nextToken"); } - private ListObjectsV2Result createListObjectsV2Result(final List summaries, - final String nextToken) { - final ListObjectsV2Result result = mock(ListObjectsV2Result.class); - when(result.getObjectSummaries()).thenReturn(summaries); - when(result.getNextContinuationToken()).thenReturn(nextToken); + private ListObjectsV2Response createListObjectsV2Response(final List summaries, final String nextToken) { + final ListObjectsV2Response result = mock(ListObjectsV2Response.class); + when(result.contents()).thenReturn(summaries); + when(result.nextContinuationToken()).thenReturn(nextToken); when(result.isTruncated()).thenReturn(nextToken != null); return result; } - private S3ObjectSummary createObjectSummary(final long sizeOfObject, final String objectKey) { - final S3ObjectSummary summary = mock(S3ObjectSummary.class); - when(summary.getSize()).thenReturn(sizeOfObject); - when(summary.getKey()).thenReturn(objectKey); + private S3Object createObjectSummary(final long sizeOfObject, final String objectKey) { + final S3Object summary = mock(S3Object.class); + when(summary.size()).thenReturn(sizeOfObject); + when(summary.key()).thenReturn(objectKey); return summary; } private Iterator getS3ObjectKeysIterator(final String objectKey) { - final S3ObjectSummary objectSummary = createObjectSummary(1, objectKey); - final ListObjectsV2Result listObjectsV2Result = createListObjectsV2Result( + final S3Object objectSummary = createObjectSummary(1, objectKey); + final ListObjectsV2Response listObjectsV2Result = createListObjectsV2Response( Collections.singletonList(objectSummary), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Result); @@ -231,15 +230,15 @@ private Iterator getS3ObjectKeysIterator(final String objectKey) { public void initializeWithTaskConfigs(final int maxTasks, final int taskId) { final Map configMap = getConfigMap(maxTasks, taskId); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); - s3Client = mock(AmazonS3.class); + s3Client = mock(S3Client.class); awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); } - private ListObjectsV2Result getListObjectsV2Result() { - final S3ObjectSummary zeroByteObject = createObjectSummary(0, "key1"); - final S3ObjectSummary nonZeroByteObject1 = createObjectSummary(1, "key2"); - final S3ObjectSummary nonZeroByteObject2 = createObjectSummary(1, "key3"); - return createListObjectsV2Result(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); + private ListObjectsV2Response getListObjectsV2Response() { + final S3Object zeroByteObject = createObjectSummary(0, "key1"); + final S3Object nonZeroByteObject1 = createObjectSummary(1, "key2"); + final S3Object nonZeroByteObject2 = createObjectSummary(1, "key3"); + return createListObjectsV2Response(List.of(zeroByteObject, nonZeroByteObject1, nonZeroByteObject2), null); } } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 61d8170f7..980b44359 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -38,7 +38,6 @@ import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,12 +63,10 @@ void testIteratorProcessesS3Objects() throws Exception { final String key = "topic-00001-abc123.txt"; - // Mock S3Object and InputStream - try (S3Object mockS3Object = mock(S3Object.class); - S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(mockS3Object); - when(mockS3Object.getObjectContent()).thenReturn(mockInputStream); + // Mock InputStream + try (S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), + null);) { + when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) .thenReturn(Stream.of(new Object())); @@ -98,12 +95,10 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { final String key = "topic-00001-abc123.txt"; - // Mock S3Object and InputStream - try (S3Object mockS3Object = mock(S3Object.class); - S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { - when(mockSourceApiClient.getObject(anyString())).thenReturn(mockS3Object); - when(mockS3Object.getObjectContent()).thenReturn(mockInputStream); + // Mock InputStream + try (S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), + null);) { + when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); // With ByteArrayTransformer mockTransformer = mock(ByteArrayTransformer.class); From 2d665e5816be5a64abc351d4369d657d3adf5e12 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Tue, 17 Dec 2024 09:17:12 +0000 Subject: [PATCH 2/5] Update following review of open TODOs Signed-off-by: Aindriu Lavelle --- s3-source-connector/build.gradle.kts | 1 - .../kafka/connect/s3/source/utils/AWSV2SourceClient.java | 2 +- .../io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java | 5 ----- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/s3-source-connector/build.gradle.kts b/s3-source-connector/build.gradle.kts index cfa5f6514..20d5a3b82 100644 --- a/s3-source-connector/build.gradle.kts +++ b/s3-source-connector/build.gradle.kts @@ -69,7 +69,6 @@ dependencies { implementation(project(":s3-commons")) implementation("software.amazon.awssdk:s3:$amazonS3Version") implementation("software.amazon.awssdk:sts:$amazonSTSVersion") - testImplementation("software.amazon.awssdk:url-connection-client:$amazonSTSVersion") implementation(tools.spotbugs.annotations) implementation(logginglibs.slf4j) diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index 377d7d949..44e28dfa7 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -117,7 +117,7 @@ private String optionalKey(final String key) { } return null; } - // TODO is the response closeable or autocloseable + public IOSupplier getObject(final String objectKey) { final GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucketName).key(objectKey).build(); final ResponseBytes s3ObjectResponse = s3Client.getObjectAsBytes(getObjectRequest); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index b4bb918ac..13ac66844 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES; import java.lang.reflect.Field; import java.net.URI; @@ -59,10 +58,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryMode; -import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; -import software.amazon.awssdk.utils.AttributeMap; @ExtendWith(MockitoExtension.class) final class S3SourceTaskTest { @@ -107,8 +104,6 @@ public static void setUpClass() throws URISyntaxException { .overrideConfiguration(clientOverrideConfiguration) .region(config.getAwsS3Region()) .endpointOverride(URI.create(config.getAwsS3EndPoint())) - .httpClient(UrlConnectionHttpClient.builder() - .buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, Boolean.TRUE).build())) .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) .build(); From adebf34b7b3bd5b2280239ef7140a3abd6ef2bf1 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Tue, 17 Dec 2024 09:31:10 +0000 Subject: [PATCH 3/5] Add backoffStrat with jitter Signed-off-by: Aindriu Lavelle --- .../connect/s3/source/config/S3ClientFactory.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java index 31cd6191c..451311cd2 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java @@ -17,12 +17,15 @@ package io.aiven.kafka.connect.s3.source.config; import java.net.URI; +import java.time.Duration; import java.util.Objects; +import java.util.Random; import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.retries.api.internal.backoff.ExponentialDelayWithJitter; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; @@ -32,11 +35,9 @@ public class S3ClientFactory { public S3Client createAmazonS3Client(final S3SourceConfig config) { - // EndpointConfiguration is no longer used in SDK 2.X - // TODO Review back off strategy - // final BackoffStrategy backoffStrategy = - // BackoffStrategy.exponentialDelayWithoutJitter(Duration.ofMillis(Math.toIntExact(config.getS3RetryBackoffDelayMs())), - // Duration.ofMillis(Math.toIntExact(config.getS3RetryBackoffMaxDelayMs()))); + final ExponentialDelayWithJitter backoffStrategy = new ExponentialDelayWithJitter(Random::new, + Duration.ofMillis(Math.toIntExact(config.getS3RetryBackoffDelayMs())), + Duration.ofMillis(Math.toIntExact(config.getS3RetryBackoffMaxDelayMs()))); final ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder() .retryStrategy(RetryMode.STANDARD) @@ -44,6 +45,7 @@ public S3Client createAmazonS3Client(final S3SourceConfig config) { if (Objects.isNull(config.getAwsS3EndPoint())) { return S3Client.builder() .overrideConfiguration(clientOverrideConfiguration) + .overrideConfiguration(o -> o.retryStrategy(r -> r.backoffStrategy(backoffStrategy))) .region(config.getAwsS3Region()) .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) .build(); From 65c60e4f9aba91c9b45e0a993ab7b9e6b5dfc4f5 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Tue, 17 Dec 2024 10:02:46 +0000 Subject: [PATCH 4/5] Additional check to make sure all old com.amazon entries are now removed from the source connector Signed-off-by: Aindriu Lavelle --- .../s3/source/utils/SourceRecordIterator.java | 3 - .../s3/source/config/S3SourceConfigTest.java | 3 +- .../s3/source/testutils/BucketAccessor.java | 10 +- .../s3/source/testutils/S3OutputStream.java | 181 ------------------ .../utils/SourceRecordIteratorTest.java | 8 +- 5 files changed, 8 insertions(+), 197 deletions(-) delete mode 100644 s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 32f48eba4..26f3c03cf 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -144,11 +143,9 @@ private List readNext() { return sourceRecords; } - try (Stream recordStream = transformer.getRecords(s3Object, topic, topicPartition, s3SourceConfig, numberOfRecsAlreadyProcessed)) { - final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8); final Iterator recordIterator = recordStream.iterator(); while (recordIterator.hasNext()) { final Object record = recordIterator.next(); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java index 35b0b5ead..10939c511 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java @@ -27,7 +27,6 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; -import com.amazonaws.regions.Regions; import org.junit.jupiter.api.Test; import software.amazon.awssdk.regions.Region; @@ -42,7 +41,7 @@ void correctFullConfig() { props.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket"); props.put(S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG, "AWS_S3_ENDPOINT"); props.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "AWS_S3_PREFIX"); - props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName()); + props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Region.US_EAST_1.id()); // record, topic specific props props.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java index 58c956d87..8b34f73d0 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/BucketAccessor.java @@ -32,7 +32,6 @@ import io.aiven.kafka.connect.common.config.CompressionType; -import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.github.luben.zstd.ZstdInputStream; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.slf4j.Logger; @@ -47,6 +46,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; public class BucketAccessor { @@ -79,11 +79,9 @@ public final void removeBucket() { .bucket(bucketName) .delete(Delete.builder().objects(deleteIds).build()) .build()); - } catch (final MultiObjectDeleteException e) { - for (final var err : e.getErrors()) { - LOGGER.warn(String.format("Couldn't delete object: %s. Reason: [%s] %s", err.getKey(), err.getCode(), - err.getMessage())); - } + } catch (final S3Exception e) { + LOGGER.warn( + String.format("Couldn't delete objects. Reason: [%s] %s", e.awsErrorDetails().errorMessage(), e)); } catch (final SdkException e) { LOGGER.error("Couldn't delete objects: {}, Exception{} ", deleteIds, e.getMessage()); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java deleted file mode 100644 index 4d33e46c5..000000000 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/testutils/S3OutputStream.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright 2020 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.source.testutils; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.UploadPartRequest; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class S3OutputStream extends OutputStream { - - private final Logger logger = LoggerFactory.getLogger(S3OutputStream.class); - - public static final int DEFAULT_PART_SIZE = 5 * 1024 * 1024; // 5 MB - - private final AmazonS3 client; - - private final ByteBuffer byteBuffer; - - private final String bucketName; - - private final String key; - - private MultipartUpload multipartUpload; - - private final int partSize; - - private final String serverSideEncryptionAlgorithm; - - private boolean closed; - - @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "AmazonS3 client is mutable") - public S3OutputStream(final String bucketName, final String key, final int partSize, final AmazonS3 client) { - this(bucketName, key, partSize, client, null); - } - - @SuppressFBWarnings(value = "EI_EXPOSE_REP2", justification = "AmazonS3 client is mutable") - public S3OutputStream(final String bucketName, final String key, final int partSize, final AmazonS3 client, - final String serverSideEncryptionAlgorithm) { - super(); - this.bucketName = bucketName; - this.key = key; - this.client = client; - this.partSize = partSize; - this.byteBuffer = ByteBuffer.allocate(partSize); - this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; - } - - @Override - public void write(final int singleByte) throws IOException { - write(new byte[] { (byte) singleByte }, 0, 1); - } - - @Override - public void write(final byte[] bytes, final int off, final int len) throws IOException { - if (Objects.isNull(bytes) || bytes.length == 0) { - return; - } - if (Objects.isNull(multipartUpload)) { - multipartUpload = newMultipartUpload(); - } - final var source = ByteBuffer.wrap(bytes, off, len); - while (source.hasRemaining()) { - final var transferred = Math.min(byteBuffer.remaining(), source.remaining()); - final var offset = source.arrayOffset() + source.position(); - byteBuffer.put(source.array(), offset, transferred); - source.position(source.position() + transferred); - if (!byteBuffer.hasRemaining()) { - flushBuffer(0, partSize, partSize); - } - } - } - - private MultipartUpload newMultipartUpload() throws IOException { - logger.debug("Create new multipart upload request"); - final var initialRequest = new InitiateMultipartUploadRequest(bucketName, key); - initialRequest.setObjectMetadata(this.buildObjectMetadata()); - final var initiateResult = client.initiateMultipartUpload(initialRequest); - logger.debug("Upload ID: {}", initiateResult.getUploadId()); - return new MultipartUpload(initiateResult.getUploadId()); - } - - private ObjectMetadata buildObjectMetadata() { - final ObjectMetadata metadata = new ObjectMetadata(); - - if (this.serverSideEncryptionAlgorithm != null) { - metadata.setSSEAlgorithm(this.serverSideEncryptionAlgorithm); - } - - return metadata; - } - - @Override - public void close() throws IOException { - if (closed) { - return; - } - if (byteBuffer.position() > 0 && Objects.nonNull(multipartUpload)) { - flushBuffer(byteBuffer.arrayOffset(), byteBuffer.position(), byteBuffer.position()); - } - if (Objects.nonNull(multipartUpload)) { - multipartUpload.complete(); - multipartUpload = null; // NOPMD NullAssignment - } - closed = true; - super.close(); - } - - private void flushBuffer(final int offset, final int length, final int partSize) throws IOException { - try { - multipartUpload.uploadPart(new ByteArrayInputStream(byteBuffer.array(), offset, length), partSize); - byteBuffer.clear(); - } catch (final Exception e) { // NOPMD AvoidCatchingGenericException - multipartUpload.abort(); - multipartUpload = null; // NOPMD NullAssignment - throw new IOException(e); - } - } - - private class MultipartUpload { - - private final String uploadId; - - private final List partETags = new ArrayList<>(); - - public MultipartUpload(final String uploadId) { - this.uploadId = uploadId; - } - - public void uploadPart(final InputStream inputStream, final int partSize) throws IOException { - final var partNumber = partETags.size() + 1; - final var uploadPartRequest = new UploadPartRequest().withBucketName(bucketName) - .withKey(key) - .withUploadId(uploadId) - .withPartSize(partSize) - .withPartNumber(partNumber) - .withInputStream(inputStream); - final var uploadResult = client.uploadPart(uploadPartRequest); - partETags.add(uploadResult.getPartETag()); - } - - public void complete() { - client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETags)); - } - - public void abort() { - client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, uploadId)); - } - - } - -} diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 980b44359..b701ea85d 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.Collections; import java.util.stream.Stream; @@ -38,7 +39,6 @@ import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,8 +64,7 @@ void testIteratorProcessesS3Objects() throws Exception { final String key = "topic-00001-abc123.txt"; // Mock InputStream - try (S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { + try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) @@ -96,8 +95,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { final String key = "topic-00001-abc123.txt"; // Mock InputStream - try (S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}), - null);) { + try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) { when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); // With ByteArrayTransformer From ec1ccdfddfc31dc5029d3d1265dff797d28d38d8 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Thu, 19 Dec 2024 18:07:21 +0000 Subject: [PATCH 5/5] Update to fix a few comments on naming conventions and a missing max retry Signed-off-by: Aindriu Lavelle --- .../kafka/connect/config/s3/S3ConfigFragment.java | 10 +++++----- .../connect/iam/AwsCredentialProviderFactory.java | 4 ++-- .../connect/s3/source/config/S3ClientFactory.java | 3 ++- .../kafka/connect/s3/source/config/S3SourceConfig.java | 4 ++-- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 672409565..2ece623bf 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -347,8 +347,8 @@ public void validateCredentials() { } } else { final BasicAWSCredentials awsCredentials = getAwsCredentials(); - final AwsBasicCredentials awsV2Credentials = getAwsV2Credentials(); - if (awsCredentials == null && awsV2Credentials == null) { + final AwsBasicCredentials awsCredentialsV2 = getAwsCredentialsV2(); + if (awsCredentials == null && awsCredentialsV2 == null) { LOGGER.info( "Connector use {} as credential Provider, " + "when configuration for {{}, {}} OR {{}, {}} are absent", @@ -435,7 +435,7 @@ public BasicAWSCredentials getAwsCredentials() { return null; } - public AwsBasicCredentials getAwsV2Credentials() { + public AwsBasicCredentials getAwsCredentialsV2() { if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG)) && Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) { @@ -467,7 +467,7 @@ public Region getAwsS3Region() { } } - public software.amazon.awssdk.regions.Region getAwsV2S3Region() { + public software.amazon.awssdk.regions.Region getAwsS3RegionV2() { // we have priority of properties if old one not set or both old and new one set // the new property value will be selected if (Objects.nonNull(cfg.getString(AWS_S3_REGION_CONFIG))) { @@ -515,7 +515,7 @@ public AWSCredentialsProvider getCustomCredentialsProvider() { return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class); } - public AwsCredentialsProvider getCustomV2CredentialsProvider() { + public AwsCredentialsProvider getCustomCredentialsProviderV2() { return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AwsCredentialsProvider.class); } diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java index 52e7d4b91..167d872a7 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java @@ -69,9 +69,9 @@ public AwsCredentialsProvider getAwsV2Provider(final S3ConfigFragment config) { if (config.hasAwsStsRole()) { return getV2StsProvider(config); } - final AwsBasicCredentials awsCredentials = config.getAwsV2Credentials(); + final AwsBasicCredentials awsCredentials = config.getAwsCredentialsV2(); if (Objects.isNull(awsCredentials)) { - return config.getCustomV2CredentialsProvider(); + return config.getCustomCredentialsProviderV2(); } return StaticCredentialsProvider.create(awsCredentials); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java index 451311cd2..13ff4d690 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java @@ -45,7 +45,8 @@ public S3Client createAmazonS3Client(final S3SourceConfig config) { if (Objects.isNull(config.getAwsS3EndPoint())) { return S3Client.builder() .overrideConfiguration(clientOverrideConfiguration) - .overrideConfiguration(o -> o.retryStrategy(r -> r.backoffStrategy(backoffStrategy))) + .overrideConfiguration(o -> o.retryStrategy( + r -> r.backoffStrategy(backoffStrategy).maxAttempts(config.getS3RetryBackoffMaxRetries()))) .region(config.getAwsS3Region()) .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) .build(); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index d92f448ce..23dc69e9e 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -86,7 +86,7 @@ public AwsStsEndpointConfig getStsEndpointConfig() { } public AwsBasicCredentials getAwsCredentials() { - return s3ConfigFragment.getAwsV2Credentials(); + return s3ConfigFragment.getAwsCredentialsV2(); } public String getAwsS3EndPoint() { @@ -94,7 +94,7 @@ public String getAwsS3EndPoint() { } public Region getAwsS3Region() { - return s3ConfigFragment.getAwsV2S3Region(); + return s3ConfigFragment.getAwsS3RegionV2(); } public String getAwsS3BucketName() {