Skip to content

Commit

Permalink
Update to use AWS SDK v2 dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
Leon Lin committed Apr 17, 2024
1 parent 0461ef8 commit 1e45a68
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 116 deletions.
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ spark / sparkVersion := getSparkVersion()
// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
val flinkVersion = "1.16.1"
val hadoopVersion = "3.3.4"
val hadoopVersion = "3.4.0"
val scalaTestVersion = "3.2.15"
val scalaTestVersionForConnectors = "3.0.8"
val parquet4sVersion = "1.9.4"
Expand Down Expand Up @@ -400,6 +400,9 @@ lazy val storage = (project in file("storage"))
// is not compatible with 3.3.2.
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided",

// AMZN: aws sdk v2 dependency
"software.amazon.awssdk" % "s3" % "2.23.18" % "provided",

// Test Deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
),
Expand All @@ -421,7 +424,7 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
// Test / publishArtifact := true,

libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",
"software.amazon.awssdk" % "dynamodb" % "2.23.18" % "provided",

// Test Deps
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "test", // RemoteFileChangedException
Expand Down
6 changes: 3 additions & 3 deletions storage-s3-dynamodb/integration_tests/dynamodb_logstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@
./run-integration-tests.py --use-local \
--run-storage-s3-dynamodb-integration-tests \
--dbb-packages org.apache.hadoop:hadoop-aws:3.3.1 \
--dbb-conf io.delta.storage.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider \
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider
--dbb-packages org.apache.hadoop:hadoop-aws:3.4.0 \
--dbb-conf io.delta.storage.credentials.provider=software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider \
spark.hadoop.fs.s3a.aws.credentials.provider=software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider
"""

# ===== Mandatory input from user =====
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,30 @@

import org.apache.hadoop.conf.Configuration;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.ComparisonOperator;
import com.amazonaws.services.dynamodbv2.model.Condition;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.QueryRequest;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ComparisonOperator;
import software.amazon.awssdk.services.dynamodb.model.Condition;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.ExpectedAttributeValue;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.TableStatus;

/**
* A concrete implementation of {@link BaseExternalLogStore} that uses an external DynamoDB table
Expand Down Expand Up @@ -103,7 +103,7 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
/**
* Member fields
*/
private final AmazonDynamoDBClient client;
private final DynamoDbClient client;
private final String tableName;
private final String credentialsProviderName;
private final String regionName;
Expand All @@ -116,7 +116,7 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException {
credentialsProviderName = getParam(
hadoopConf,
DDB_CLIENT_CREDENTIALS_PROVIDER,
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
"software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider"
);
regionName = getParam(hadoopConf, DDB_CLIENT_REGION, "us-east-1");

Expand Down Expand Up @@ -182,33 +182,35 @@ protected Optional<ExternalCommitEntry> getExternalEntry(
String tablePath,
String fileName) {
final Map<String, AttributeValue> attributes = new ConcurrentHashMap<>();
attributes.put(ATTR_TABLE_PATH, new AttributeValue(tablePath));
attributes.put(ATTR_FILE_NAME, new AttributeValue(fileName));
attributes.put(ATTR_TABLE_PATH, AttributeValue.builder().s(tablePath).build());
attributes.put(ATTR_FILE_NAME, AttributeValue.builder().s(fileName).build());

Map<String, AttributeValue> item = client.getItem(
new GetItemRequest(tableName, attributes).withConsistentRead(true)
).getItem();
Map<String, AttributeValue> item = client.getItem(GetItemRequest.builder()
.tableName(tableName)
.consistentRead(true)
.key(attributes)
.build()
).item();

return item != null ? Optional.of(dbResultToCommitEntry(item)) : Optional.empty();
return (item != null && !item.isEmpty()) ? Optional.of(dbResultToCommitEntry(item)) : Optional.empty();
}

@Override
protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path tablePath) {
final Map<String, Condition> conditions = new ConcurrentHashMap<>();
conditions.put(
ATTR_TABLE_PATH,
new Condition()
.withComparisonOperator(ComparisonOperator.EQ)
.withAttributeValueList(new AttributeValue(tablePath.toString()))
);

final List<Map<String,AttributeValue>> items = client.query(
new QueryRequest(tableName)
.withConsistentRead(true)
.withScanIndexForward(false)
.withLimit(1)
.withKeyConditions(conditions)
).getItems();
conditions.put(ATTR_TABLE_PATH, Condition.builder() // Updated Condition builder
.comparisonOperator(ComparisonOperator.EQ)
.attributeValueList(AttributeValue.builder().s(tablePath.toString()).build())
.build());

final List<Map<String,AttributeValue>> items = client.query(QueryRequest.builder() // Updated QueryRequest
.tableName(tableName)
.consistentRead(true)
.scanIndexForward(false)
.limit(1)
.keyConditions(conditions)
.build()
).items();

if (items.isEmpty()) {
return Optional.empty();
Expand All @@ -223,37 +225,34 @@ protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path tablePath) {
private ExternalCommitEntry dbResultToCommitEntry(Map<String, AttributeValue> item) {
final AttributeValue expireTimeAttr = item.get(ATTR_EXPIRE_TIME);
return new ExternalCommitEntry(
new Path(item.get(ATTR_TABLE_PATH).getS()),
item.get(ATTR_FILE_NAME).getS(),
item.get(ATTR_TEMP_PATH).getS(),
item.get(ATTR_COMPLETE).getS().equals("true"),
expireTimeAttr != null ? Long.parseLong(expireTimeAttr.getN()) : null
new Path(item.get(ATTR_TABLE_PATH).s()),
item.get(ATTR_FILE_NAME).s(),
item.get(ATTR_TEMP_PATH).s(),
Boolean.parseBoolean(item.get(ATTR_COMPLETE).s()),
expireTimeAttr != null ? Long.parseLong(expireTimeAttr.n()) : null
);
}

private PutItemRequest createPutItemRequest(ExternalCommitEntry entry, boolean overwrite) {
final Map<String, AttributeValue> attributes = new ConcurrentHashMap<>();
attributes.put(ATTR_TABLE_PATH, new AttributeValue(entry.tablePath.toString()));
attributes.put(ATTR_FILE_NAME, new AttributeValue(entry.fileName));
attributes.put(ATTR_TEMP_PATH, new AttributeValue(entry.tempPath));
attributes.put(
ATTR_COMPLETE,
new AttributeValue().withS(Boolean.toString(entry.complete))
);
attributes.put(ATTR_TABLE_PATH, AttributeValue.builder().s(entry.tablePath.toString()).build());
attributes.put(ATTR_FILE_NAME, AttributeValue.builder().s(entry.fileName).build());
attributes.put(ATTR_TEMP_PATH, AttributeValue.builder().s(entry.tempPath).build());
attributes.put(ATTR_COMPLETE, AttributeValue.builder().s(Boolean.toString(entry.complete)).build());

if (entry.expireTime != null) {
attributes.put(
ATTR_EXPIRE_TIME,
new AttributeValue().withN(entry.expireTime.toString())
);
attributes.put(ATTR_EXPIRE_TIME, AttributeValue.builder().n(entry.expireTime.toString()).build());
}

final PutItemRequest pr = new PutItemRequest(tableName, attributes);
PutItemRequest pr = PutItemRequest.builder() // Updated PutItemRequest builder
.tableName(tableName)
.item(attributes)
.build();

if (!overwrite) {
Map<String, ExpectedAttributeValue> expected = new ConcurrentHashMap<>();
expected.put(ATTR_FILE_NAME, new ExpectedAttributeValue(false));
pr.withExpected(expected);
expected.put(ATTR_FILE_NAME, ExpectedAttributeValue.builder().exists(false).build());
pr = pr.toBuilder().expected(expected).build();
}

return pr;
Expand All @@ -263,12 +262,12 @@ private void tryEnsureTableExists(Configuration hadoopConf) throws IOException {
int retries = 0;
boolean created = false;
while(retries < 20) {
String status = "CREATING";
TableStatus status = TableStatus.CREATING;
try {
// https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/dynamodbv2/model/TableDescription.html#getTableStatus--
DescribeTableResult result = client.describeTable(tableName);
TableDescription descr = result.getTable();
status = descr.getTableStatus();
DescribeTableResponse response = client.describeTable(request -> request.tableName(tableName));
TableDescription table = response.table();
status = table.tableStatus();
} catch (ResourceNotFoundException e) {
final long rcu = Long.parseLong(getParam(hadoopConf, DDB_CREATE_TABLE_RCU, "5"));
final long wcu = Long.parseLong(getParam(hadoopConf, DDB_CREATE_TABLE_WCU, "5"));
Expand All @@ -278,33 +277,28 @@ private void tryEnsureTableExists(Configuration hadoopConf) throws IOException {
"Creating it now with provisioned throughput of {} RCUs and {} WCUs.",
tableName, regionName, rcu, wcu);
try {
client.createTable(
// attributeDefinitions
java.util.Arrays.asList(
new AttributeDefinition(ATTR_TABLE_PATH, ScalarAttributeType.S),
new AttributeDefinition(ATTR_FILE_NAME, ScalarAttributeType.S)
),
tableName,
// keySchema
Arrays.asList(
new KeySchemaElement(ATTR_TABLE_PATH, KeyType.HASH),
new KeySchemaElement(ATTR_FILE_NAME, KeyType.RANGE)
),
new ProvisionedThroughput(rcu, wcu)
);
client.createTable(request -> request
.attributeDefinitions(
AttributeDefinition.builder().attributeName(ATTR_TABLE_PATH).attributeType(ScalarAttributeType.S).build(),
AttributeDefinition.builder().attributeName(ATTR_FILE_NAME).attributeType(ScalarAttributeType.S).build())
.tableName(tableName)
.keySchema(
KeySchemaElement.builder().attributeName(ATTR_TABLE_PATH).keyType(KeyType.HASH).build(),
KeySchemaElement.builder().attributeName(ATTR_FILE_NAME).keyType(KeyType.RANGE).build())
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(rcu).writeCapacityUnits(wcu).build()));
created = true;
} catch (ResourceInUseException e3) {
// race condition - table just created by concurrent process
}
}
if (status.equals("ACTIVE")) {
if (status == TableStatus.ACTIVE) {
if (created) {
LOG.info("Successfully created DynamoDB table `{}`", tableName);
} else {
LOG.info("Table `{}` already exists", tableName);
}
break;
} else if (status.equals("CREATING")) {
} else if (status == TableStatus.CREATING) {
retries += 1;
LOG.info("Waiting for `{}` table creation", tableName);
try {
Expand All @@ -319,12 +313,14 @@ private void tryEnsureTableExists(Configuration hadoopConf) throws IOException {
};
}

private AmazonDynamoDBClient getClient() throws java.io.IOException {
private DynamoDbClient getClient() throws java.io.IOException {
try {
final AWSCredentialsProvider awsCredentialsProvider =
final AwsCredentialsProvider awsCredentialsProvider =
ReflectionUtils.createAwsCredentialsProvider(credentialsProviderName, initHadoopConf());
final AmazonDynamoDBClient client = new AmazonDynamoDBClient(awsCredentialsProvider);
client.setRegion(Region.getRegion(Regions.fromName(regionName)));
final DynamoDbClient client = DynamoDbClient.builder()
.region(Region.of(regionName))
.credentialsProvider(awsCredentialsProvider)
.build();
return client;
} catch (ReflectiveOperationException e) {
throw new java.io.IOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package io.delta.storage.utils;

import com.amazonaws.auth.AWSCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import org.apache.hadoop.conf.Configuration;

import java.lang.reflect.Method;
import java.util.Arrays;

public class ReflectionUtils {
Expand All @@ -39,21 +40,29 @@ private static boolean readsCredsFromHadoopConf(Class<?> awsCredentialsProviderC
* @param credentialsProviderClassName Fully qualified name of the desired credentials provider class.
* @param hadoopConf Hadoop configuration, used to create instance of AWS credentials
* provider, if supported.
* @return {@link AWSCredentialsProvider} object, instantiated from the class @see {credentialsProviderClassName}
* @return {@link AwsCredentialsProvider} object, instantiated from the class @see {credentialsProviderClassName}
* @throws ReflectiveOperationException When AWS credentials provider constrictor do not matched.
* Means class has neither an constructor with no args as input
* nor constructor with only Hadoop configuration as argument.
*/
public static AWSCredentialsProvider createAwsCredentialsProvider(
public static AwsCredentialsProvider createAwsCredentialsProvider(
String credentialsProviderClassName,
Configuration hadoopConf) throws ReflectiveOperationException {
Class<?> awsCredentialsProviderClass = Class.forName(credentialsProviderClassName);
if (readsCredsFromHadoopConf(awsCredentialsProviderClass))
return (AWSCredentialsProvider) awsCredentialsProviderClass
if (readsCredsFromHadoopConf(awsCredentialsProviderClass)) {
return (AwsCredentialsProvider) awsCredentialsProviderClass
.getConstructor(Configuration.class)
.newInstance(hadoopConf);
else
return (AWSCredentialsProvider) awsCredentialsProviderClass.getConstructor().newInstance();
} else {
try {
// Try to use the static create() method
Method createMethod = awsCredentialsProviderClass.getMethod("create");
return (AwsCredentialsProvider) createMethod.invoke(null);
} catch (NoSuchMethodException e) {
// Fall back to the empty constructor if create() method is not available
return (AwsCredentialsProvider) awsCredentialsProviderClass.getConstructor().newInstance();
}
}
}

}
Loading

0 comments on commit 1e45a68

Please sign in to comment.