Skip to content

Commit

Permalink
Migrate to new AWS client
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 31, 2024
1 parent d7b8236 commit 9ccb1a8
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;

/**
* S3SourceTask is a Kafka Connect SourceTask implementation that reads from source-s3 buckets and generates Kafka
Expand Down Expand Up @@ -92,8 +91,8 @@ public boolean hasNext() {
while (stillPolling()) {
try {
return s3SourceRecordIterator.hasNext();
} catch (AmazonS3Exception exception) {
if (exception.isRetryable()) {
} catch (SdkException exception) {
if (exception.retryable()) {
LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...",
exception);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -45,9 +46,8 @@
import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer;
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.config.s3.S3ConfigFragment;

import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.utils.ConnectUtils;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;

Expand All @@ -58,16 +58,11 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;


final class S3SourceTaskTest {

/**
Expand All @@ -80,7 +75,6 @@ final class S3SourceTaskTest {

private static final String TEST_BUCKET = "test-bucket";


private static final String TOPIC = "TOPIC1";

private static final int PARTITION = 1;
Expand All @@ -89,7 +83,6 @@ final class S3SourceTaskTest {

// TODO S3Mock has not been maintained in 4 years
// Adobe have an alternative we can move to.

private static S3Mock s3Api;
private static S3Client s3Client;

Expand Down Expand Up @@ -120,9 +113,6 @@ public static void setUpClass() throws URISyntaxException {
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
.credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment()))
.build();

final BucketAccessor testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET);
testBucketAccessor.createBucket();
}

@AfterAll
Expand All @@ -134,6 +124,8 @@ public static void tearDownClass() {
public void setUp() {
properties = new HashMap<>(commonProperties);
s3Client.createBucket(create -> create.bucket(TEST_BUCKET).build());
// mockedSourceTaskContext = mock(SourceTaskContext.class);
// mockedOffsetStorageReader = mock(OffsetStorageReader.class);
}

@AfterEach
Expand Down

0 comments on commit 9ccb1a8

Please sign in to comment.