Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS SDK 2.X migration for source connector [KCON-84] #374

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions s3-commons/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ plugins { id("aiven-apache-kafka-connectors-all.java-conventions") }

val amazonS3Version by extra("1.12.777")
val amazonSTSVersion by extra("1.12.777")
val amazonV2Version by extra("2.29.34")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be updated to 2.29.36 😆 Please try to keep up!!

(Just kidding, but we should keep in mind to bump it to the most recent patch version before release!)


dependencies {
implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version")
implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion")
implementation("software.amazon.awssdk:auth:$amazonV2Version")
implementation("software.amazon.awssdk:sts:$amazonV2Version")

implementation(project(":commons"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
import com.amazonaws.services.s3.internal.BucketNameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

/**
* The configuration fragment that defines the S3 specific characteristics.
*/
@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.TooManyStaticImports" })
@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.TooManyStaticImports", "PMD.GodClass" })
public final class S3ConfigFragment extends ConfigFragment {

private static final Logger LOGGER = LoggerFactory.getLogger(S3ConfigFragment.class);
Expand Down Expand Up @@ -345,7 +347,8 @@ public void validateCredentials() {
}
} else {
final BasicAWSCredentials awsCredentials = getAwsCredentials();
if (awsCredentials == null) {
final AwsBasicCredentials awsCredentialsV2 = getAwsCredentialsV2();
if (awsCredentials == null && awsCredentialsV2 == null) {
LOGGER.info(
"Connector use {} as credential Provider, "
+ "when configuration for {{}, {}} OR {{}, {}} are absent",
Expand Down Expand Up @@ -410,11 +413,13 @@ public AwsStsEndpointConfig getStsEndpointConfig() {
return new AwsStsEndpointConfig(cfg.getString(AWS_STS_CONFIG_ENDPOINT), cfg.getString(AWS_S3_REGION_CONFIG));
}

@Deprecated
public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() {
final AwsStsEndpointConfig config = getStsEndpointConfig();
return new AwsClientBuilder.EndpointConfiguration(config.getServiceEndpoint(), config.getSigningRegion());
}

@Deprecated
public BasicAWSCredentials getAwsCredentials() {
if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG))
&& Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) {
Expand All @@ -430,12 +435,26 @@ public BasicAWSCredentials getAwsCredentials() {
return null;
}

public AwsBasicCredentials getAwsCredentialsV2() {
if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG))
&& Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) {

return AwsBasicCredentials.create(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(),
cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value());
} else if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID))
&& Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY))) {
LOGGER.warn("Config options {} and {} are not supported for this Connector", AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY);
}
return null;
}

public String getAwsS3EndPoint() {
return Objects.nonNull(cfg.getString(AWS_S3_ENDPOINT_CONFIG))
? cfg.getString(AWS_S3_ENDPOINT_CONFIG)
: cfg.getString(AWS_S3_ENDPOINT);
}

@Deprecated
public Region getAwsS3Region() {
// we have priority of properties if old one not set or both old and new one set
// the new property value will be selected
Expand All @@ -448,6 +467,18 @@ public Region getAwsS3Region() {
}
}

public software.amazon.awssdk.regions.Region getAwsS3RegionV2() {
// we have priority of properties if old one not set or both old and new one set
// the new property value will be selected
if (Objects.nonNull(cfg.getString(AWS_S3_REGION_CONFIG))) {
return software.amazon.awssdk.regions.Region.of(cfg.getString(AWS_S3_REGION_CONFIG));
} else if (Objects.nonNull(cfg.getString(AWS_S3_REGION))) {
return software.amazon.awssdk.regions.Region.of(cfg.getString(AWS_S3_REGION));
} else {
return software.amazon.awssdk.regions.Region.of(Regions.US_EAST_1.getName());
}
}

public String getAwsS3BucketName() {
return Objects.nonNull(cfg.getString(AWS_S3_BUCKET_NAME_CONFIG))
? cfg.getString(AWS_S3_BUCKET_NAME_CONFIG)
Expand Down Expand Up @@ -484,6 +515,10 @@ public AWSCredentialsProvider getCustomCredentialsProvider() {
return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class);
}

public AwsCredentialsProvider getCustomCredentialsProviderV2() {
return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AwsCredentialsProvider.class);
}

public int getFetchPageSize() {
return cfg.getInt(FETCH_PAGE_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

public class AwsCredentialProviderFactory {

Expand Down Expand Up @@ -58,4 +63,33 @@ private AWSSecurityTokenService securityTokenService(final S3ConfigFragment conf
}
return AWSSecurityTokenServiceClientBuilder.defaultClient();
}

public AwsCredentialsProvider getAwsV2Provider(final S3ConfigFragment config) {

if (config.hasAwsStsRole()) {
return getV2StsProvider(config);
}
final AwsBasicCredentials awsCredentials = config.getAwsCredentialsV2();
if (Objects.isNull(awsCredentials)) {
return config.getCustomCredentialsProviderV2();
}
return StaticCredentialsProvider.create(awsCredentials);

}

private StsAssumeRoleCredentialsProvider getV2StsProvider(final S3ConfigFragment config) {
if (config.hasAwsStsRole()) {
return StsAssumeRoleCredentialsProvider.builder()
.refreshRequest(() -> AssumeRoleRequest.builder()
.roleArn(config.getStsRole().getArn())
// Maker this a unique identifier
.roleSessionName("AwsV2SDKConnectorSession")
.build())
.build();
}

return StsAssumeRoleCredentialsProvider.builder().build();

}

}
9 changes: 4 additions & 5 deletions s3-source-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import com.github.spotbugs.snom.SpotBugsTask

plugins { id("aiven-apache-kafka-connectors-all.java-conventions") }

val amazonS3Version by extra("1.12.729")
val amazonSTSVersion by extra("1.12.729")
val amazonS3Version by extra("2.29.34")
val amazonSTSVersion by extra("2.29.34")
val s3mockVersion by extra("0.2.6")
val testKafkaVersion by extra("3.7.1")

Expand Down Expand Up @@ -67,8 +67,8 @@ dependencies {

implementation(project(":commons"))
implementation(project(":s3-commons"))
implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version")
implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion")
implementation("software.amazon.awssdk:s3:$amazonS3Version")
implementation("software.amazon.awssdk:sts:$amazonSTSVersion")

implementation(tools.spotbugs.annotations)
implementation(logginglibs.slf4j)
Expand Down Expand Up @@ -154,7 +154,6 @@ dependencies {
exclude(group = "org.apache.commons", module = "commons-math3")
exclude(group = "org.apache.httpcomponents", module = "httpclient")
exclude(group = "commons-codec", module = "commons-codec")
exclude(group = "commons-io", module = "commons-io")
exclude(group = "commons-net", module = "commons-net")
exclude(group = "org.eclipse.jetty")
exclude(group = "org.eclipse.jetty.websocket")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -47,11 +48,6 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -61,6 +57,10 @@
import org.testcontainers.containers.Container;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

public interface IntegrationBase {
String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
Expand Down Expand Up @@ -101,13 +101,13 @@ static void waitForRunningContainer(final Container<?> container) {
await().atMost(Duration.ofMinutes(1)).until(container::isRunning);
}

static AmazonS3 createS3Client(final LocalStackContainer localStackContainer) {
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString(),
localStackContainer.getRegion()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
localStackContainer.getAccessKey(), localStackContainer.getSecretKey())))
static S3Client createS3Client(final LocalStackContainer localStackContainer) {
return S3Client.builder()
.endpointOverride(
URI.create(localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
.region(Region.of(localStackContainer.getRegion()))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials
.create(localStackContainer.getAccessKey(), localStackContainer.getSecretKey())))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -62,9 +61,6 @@
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.testutils.ContentUtils;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
Expand All @@ -83,6 +79,9 @@
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

@Testcontainers
@SuppressWarnings("PMD.ExcessiveImports")
Expand Down Expand Up @@ -111,7 +110,7 @@ final class IntegrationTest implements IntegrationBase {
private AdminClient adminClient;
private ConnectRunner connectRunner;

private static AmazonS3 s3Client;
private static S3Client s3Client;

@BeforeAll
static void setUpAll() throws IOException, InterruptedException {
Expand Down Expand Up @@ -263,7 +262,7 @@ void parquetTest(final TestInfo testInfo) throws IOException {
final Path path = ContentUtils.getTmpFilePath(name);

try {
s3Client.putObject(TEST_BUCKET_NAME, fileName, Files.newInputStream(path), null);
s3Client.putObject(PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(fileName).build(), path);
} catch (final Exception e) { // NOPMD broad exception caught
LOGGER.error("Error in reading file {}", e.getMessage(), e);
} finally {
Expand Down Expand Up @@ -341,9 +340,8 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId
private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) {
final String objectKey = addPrefixOrDefault("") + topicName + "-" + partitionId + "-"
+ System.currentTimeMillis() + ".txt";
final PutObjectRequest request = new PutObjectRequest(TEST_BUCKET_NAME, objectKey,
new ByteArrayInputStream(testDataBytes), new ObjectMetadata());
s3Client.putObject(request);
final PutObjectRequest request = PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(objectKey).build();
s3Client.putObject(request, RequestBody.fromBytes(testDataBytes));
return OBJECT_KEY + SEPARATOR + objectKey;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import io.aiven.kafka.connect.s3.source.utils.Version;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;

/**
* S3SourceTask is a Kafka Connect SourceTask implementation that reads from source-s3 buckets and generates Kafka
Expand All @@ -64,7 +64,7 @@ public class S3SourceTask extends SourceTask {
private static final long ERROR_BACKOFF = 1000L;

private S3SourceConfig s3SourceConfig;
private AmazonS3 s3Client;
private S3Client s3Client;

private Iterator<S3SourceRecord> sourceRecordIterator;
private Transformer transformer;
Expand Down Expand Up @@ -122,8 +122,8 @@ public List<SourceRecord> poll() throws InterruptedException {
extractSourceRecords(results);
LOGGER.info("Number of records extracted and sent: {}", results.size());
return results;
} catch (AmazonS3Exception exception) {
if (exception.isRetryable()) {
} catch (SdkException exception) {
if (exception.retryable()) {
LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...",
exception);
pollLock.wait(ERROR_BACKOFF);
Expand Down
Loading
Loading