From 9ccb1a8ff3dd68129e06ef7d5effc6a9b2e63e47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C2=A8Claude?= <¨claude.warren@aiven.io¨> Date: Tue, 31 Dec 2024 08:49:51 +0000 Subject: [PATCH] Migrate to new AWS client --- .../kafka/connect/s3/source/S3SourceTask.java | 5 ++--- .../connect/s3/source/S3SourceTaskTest.java | 18 +++++------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 73653659..6baccdb9 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -39,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.services.s3.S3Client; /** * S3SourceTask is a Kafka Connect SourceTask implementation that reads from source-s3 buckets and generates Kafka @@ -92,8 +91,8 @@ public boolean hasNext() { while (stillPolling()) { try { return s3SourceRecordIterator.hasNext(); - } catch (AmazonS3Exception exception) { - if (exception.isRetryable()) { + } catch (SdkException exception) { + if (exception.retryable()) { LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...", exception); try { diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index 68246808..580af7df 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -23,7 +23,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -45,9 +46,8 @@ import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; - +import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.utils.ConnectUtils; import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; @@ -58,16 +58,11 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; - -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; - final class S3SourceTaskTest { /** @@ -80,7 +75,6 @@ final class S3SourceTaskTest { private static final String TEST_BUCKET = "test-bucket"; - private static final String TOPIC = "TOPIC1"; private static final int PARTITION = 1; @@ -89,7 +83,6 @@ final class S3SourceTaskTest { // TODO S3Mock has not been maintained in 4 years // Adobe have an alternative we can move to. - private static S3Mock s3Api; private static S3Client s3Client; @@ -120,9 +113,6 @@ public static void setUpClass() throws URISyntaxException { .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) .build(); - - final BucketAccessor testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET); - testBucketAccessor.createBucket(); } @AfterAll @@ -134,6 +124,8 @@ public static void tearDownClass() { public void setUp() { properties = new HashMap<>(commonProperties); s3Client.createBucket(create -> create.bucket(TEST_BUCKET).build()); + // mockedSourceTaskContext = mock(SourceTaskContext.class); + // mockedOffsetStorageReader = mock(OffsetStorageReader.class); } @AfterEach