Skip to content

Commit

Permalink
feat: Added support to specify AWS profile through S3 instructions (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Oct 3, 2024
1 parent d2873dd commit 15ff4f5
Show file tree
Hide file tree
Showing 20 changed files with 822 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,41 @@
package io.deephaven.iceberg.util;

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;

import static org.apache.iceberg.aws.AwsClientProperties.CLIENT_REGION;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

public class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeClass
public static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
SingletonContainers.LocalStack.init();
LocalStack.init();
}

@Override
public Builder s3Instructions(final Builder builder) {
return SingletonContainers.LocalStack.s3Instructions(builder);
return LocalStack.s3Instructions(builder);
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.LocalStack.s3AsyncClient();
return LocalStack.s3AsyncClient();
}

@Override
public Map<String, String> s3Properties() {
return SingletonContainers.LocalStack.s3Properties();
return Map.of(
ENDPOINT, LocalStack.s3Endpoint(),
CLIENT_REGION, LocalStack.region(),
ACCESS_KEY_ID, LocalStack.accessKey(),
SECRET_ACCESS_KEY, LocalStack.secretAccessKey());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,46 @@
package io.deephaven.iceberg.util;

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;

import static org.apache.iceberg.aws.AwsClientProperties.CLIENT_REGION;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

public class IcebergMinIOTest extends IcebergToolsTest {

@BeforeClass
public static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
// ensure container is started so container startup time isn't associated with a specific test
SingletonContainers.MinIO.init();
MinIO.init();
}

@Override
public Builder s3Instructions(final Builder builder) {
return SingletonContainers.MinIO.s3Instructions(builder);
return MinIO.s3Instructions(builder);
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.MinIO.s3AsyncClient();
return MinIO.s3AsyncClient();
}

@Override
public Map<String, String> s3Properties() {
return SingletonContainers.MinIO.s3Properties();
return Map.of(
ENDPOINT, MinIO.s3Endpoint(),
CLIENT_REGION, MinIO.region(),
ACCESS_KEY_ID, MinIO.accessKey(),
SECRET_ACCESS_KEY, MinIO.secretAccessKey());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

Expand All @@ -22,8 +21,28 @@ public Builder s3Instructions(Builder builder) {
return LocalStack.s3Instructions(builder);
}

@Override
public String s3Endpoint() {
return LocalStack.s3Endpoint();
}

@Override
public String region() {
return LocalStack.region();
}

@Override
public String accessKey() {
return LocalStack.accessKey();
}

@Override
public String secretAccessKey() {
return LocalStack.secretAccessKey();
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.LocalStack.s3AsyncClient();
return LocalStack.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import io.deephaven.stats.util.OSUtil;
import org.junit.Assume;
import org.junit.BeforeClass;
Expand All @@ -26,8 +25,28 @@ public Builder s3Instructions(final Builder builder) {
return MinIO.s3Instructions(builder);
}

@Override
public String s3Endpoint() {
return MinIO.s3Endpoint();
}

@Override
public String region() {
return MinIO.region();
}

@Override
public String accessKey() {
return MinIO.accessKey();
}

@Override
public String secretAccessKey() {
return MinIO.secretAccessKey();
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.MinIO.s3AsyncClient();
return MinIO.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.deephaven.engine.table.impl.select.FormulaEvaluationException;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.engine.util.TableTools;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup;
import io.deephaven.test.types.OutOfBandTest;
Expand All @@ -28,6 +29,8 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand All @@ -51,6 +54,14 @@ abstract class S3ParquetTestBase extends S3SeekableChannelTestSetup {
@Rule
public final EngineCleanup framework = new EngineCleanup();

public abstract String s3Endpoint();

public abstract String region();

public abstract String accessKey();

public abstract String secretAccessKey();

@Before
public void setUp() throws ExecutionException, InterruptedException, TimeoutException {
super.doSetUp();
Expand Down Expand Up @@ -505,4 +516,79 @@ public void indexByLongKey() {
verifyIndexingInfoExists(fromS3, "someInt", "someLong");
verifyIndexingInfoExists(fromS3, "someLong", "someInt");
}

@Test
public void testReadWriteUsingProfile() throws IOException {
final Table table = TableTools.emptyTable(5).update("someIntColumn = (int) i");
Path tempConfigFile = null;
Path tempCredentialsFile = null;
try {
// Create temporary config and credentials file and write wrong credentials to them
tempConfigFile = Files.createTempFile("config", ".tmp");
final String configData = "[profile test-user]\nregion = wrong-region";
Files.write(tempConfigFile, configData.getBytes());

tempCredentialsFile = Files.createTempFile("credentials", ".tmp");
final String credentialsData = "[test-user]\naws_access_key_id = foo\naws_secret_access_key = bar";
Files.write(tempCredentialsFile, credentialsData.getBytes());

final S3Instructions s3Instructions = S3Instructions.builder()
.readTimeout(Duration.ofSeconds(3))
.endpointOverride(s3Endpoint())
.profileName("test-user")
.credentialsFilePath(tempCredentialsFile.toString())
.configFilePath(tempConfigFile.toString())
.credentials(Credentials.profile())
.build();
final ParquetInstructions instructions = ParquetInstructions.builder()
.setSpecialInstructions(s3Instructions)
.build();
try {
final URI uri = uri("table1.parquet");
ParquetTools.writeTable(table, uri.toString(), instructions);
fail("Expected exception");
} catch (final UncheckedDeephavenException expected) {
}
} finally {
// Delete the temporary files
if (tempConfigFile != null) {
Files.deleteIfExists(tempConfigFile);
}
if (tempCredentialsFile != null) {
Files.delete(tempCredentialsFile);
}
}

try {
// Create temporary config and credentials file and write correct credentials and region to them
tempConfigFile = Files.createTempFile("config", ".tmp");
final String configData = "[profile test-user]\nregion = " + region();
Files.write(tempConfigFile, configData.getBytes());

tempCredentialsFile = Files.createTempFile("credentials", ".tmp");
final String credentialsData = "[test-user]\naws_access_key_id = " + accessKey() +
"\naws_secret_access_key = " + secretAccessKey();
Files.write(tempCredentialsFile, credentialsData.getBytes());

final S3Instructions s3Instructions = S3Instructions.builder()
.readTimeout(Duration.ofSeconds(3))
.endpointOverride(s3Endpoint())
.profileName("test-user")
.credentialsFilePath(tempCredentialsFile.toString())
.configFilePath(tempConfigFile.toString())
.credentials(Credentials.profile())
.build();
final ParquetInstructions instructions = ParquetInstructions.builder()
.setSpecialInstructions(s3Instructions)
.build();
final URI uri = uri("table2.parquet");
ParquetTools.writeTable(table, uri.toString(), instructions);
final Table fromS3 = ParquetTools.readTable(uri.toString(), instructions);
assertTableEquals(table, fromS3);
} finally {
// Delete the temporary files
Files.delete(tempConfigFile);
Files.delete(tempCredentialsFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
//
package io.deephaven.extensions.s3;

import org.jetbrains.annotations.NotNull;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

enum AnonymousCredentials implements AwsSdkV2Credentials {
ANONYMOUS_CREDENTIALS;

@Override
public AwsCredentialsProvider awsV2CredentialsProvider() {
public AwsCredentialsProvider awsV2CredentialsProvider(@NotNull final S3Instructions instructions) {
return AnonymousCredentialsProvider.create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
//
package io.deephaven.extensions.s3;

import io.deephaven.util.annotations.InternalUseOnly;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

interface AwsSdkV2Credentials extends Credentials {
@InternalUseOnly
public interface AwsSdkV2Credentials extends Credentials {

AwsCredentialsProvider awsV2CredentialsProvider();
/**
* Get the AWS credentials provider based on the given instructions.
*/
AwsCredentialsProvider awsV2CredentialsProvider(S3Instructions instructions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.annotations.SimpleStyle;
import org.immutables.value.Value;
import org.immutables.value.Value.Immutable;
import org.jetbrains.annotations.NotNull;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
Expand All @@ -29,7 +30,7 @@ static BasicCredentials of(final String accessKeyId, final String secretAccessKe
abstract String secretAccessKey();

@Override
public final AwsCredentialsProvider awsV2CredentialsProvider() {
public final AwsCredentialsProvider awsV2CredentialsProvider(@NotNull final S3Instructions instructions) {
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId(), secretAccessKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,37 @@
//
package io.deephaven.extensions.s3;

import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;

public interface Credentials {

/**
* Default credentials provider that looks for credentials at a number of locations as described in
* {@link DefaultCredentialsProvider} and falls back to anonymous credentials if no credentials are found.
* Default credentials provider used by Deephaven which resolves credentials in the following order:
* <ol>
* <li>If a profile name, config file path, or credentials file path is provided, use
* {@link ProfileCredentialsProvider}</li>
* <li>If not, check all places mentioned in {@link DefaultCredentialsProvider} and fall back to
* {@link AnonymousCredentialsProvider}</li>
* </ol>
*
* @see ProfileCredentialsProvider
* @see DefaultCredentialsProvider
* @see AnonymousCredentialsProvider
*/
static Credentials resolving() {
return ResolvingCredentials.INSTANCE;
}

/**
* Default credentials provider used by the AWS SDK that looks for credentials at a number of locations as described
* in {@link DefaultCredentialsProvider}
*
* @see DefaultCredentialsProvider
*/
static Credentials defaultCredentials() {
return DefaultCredentials.DEFAULT_CREDENTIALS;
return DefaultCredentials.INSTANCE;
}

/**
Expand All @@ -33,4 +52,13 @@ static Credentials basic(final String accessKeyId, final String secretAccessKey)
static Credentials anonymous() {
return AnonymousCredentials.ANONYMOUS_CREDENTIALS;
}

/**
* Profile specific credentials that uses configuration and credentials files.
*
* @see ProfileCredentialsProvider
*/
static Credentials profile() {
return ProfileCredentials.INSTANCE;
}
}
Loading

0 comments on commit 15ff4f5

Please sign in to comment.