diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java index 6683bd42db1..e1c2baa5c7d 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergLocalStackTest.java @@ -4,32 +4,41 @@ package io.deephaven.iceberg.util; import io.deephaven.extensions.s3.S3Instructions.Builder; -import io.deephaven.extensions.s3.testlib.SingletonContainers; +import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack; import org.junit.BeforeClass; import software.amazon.awssdk.services.s3.S3AsyncClient; import java.util.Map; +import static org.apache.iceberg.aws.AwsClientProperties.CLIENT_REGION; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY; + public class IcebergLocalStackTest extends IcebergToolsTest { @BeforeClass public static void initContainer() { // ensure container is started so container startup time isn't associated with a specific test - SingletonContainers.LocalStack.init(); + LocalStack.init(); } @Override public Builder s3Instructions(final Builder builder) { - return SingletonContainers.LocalStack.s3Instructions(builder); + return LocalStack.s3Instructions(builder); } @Override public S3AsyncClient s3AsyncClient() { - return SingletonContainers.LocalStack.s3AsyncClient(); + return LocalStack.s3AsyncClient(); } @Override public Map s3Properties() { - return SingletonContainers.LocalStack.s3Properties(); + return Map.of( + ENDPOINT, LocalStack.s3Endpoint(), + CLIENT_REGION, LocalStack.region(), + ACCESS_KEY_ID, LocalStack.accessKey(), + SECRET_ACCESS_KEY, LocalStack.secretAccessKey()); } } diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java index 946f3eca90d..9adb98610ca 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/IcebergMinIOTest.java @@ -4,7 +4,7 @@ package io.deephaven.iceberg.util; import io.deephaven.extensions.s3.S3Instructions.Builder; -import io.deephaven.extensions.s3.testlib.SingletonContainers; +import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO; import io.deephaven.stats.util.OSUtil; import org.junit.jupiter.api.Assumptions; import org.junit.BeforeClass; @@ -12,6 +12,11 @@ import java.util.Map; +import static org.apache.iceberg.aws.AwsClientProperties.CLIENT_REGION; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.ACCESS_KEY_ID; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT; +import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY; + public class IcebergMinIOTest extends IcebergToolsTest { @BeforeClass @@ -19,22 +24,26 @@ public static void initContainer() { // TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()"); // ensure container is started so container startup time isn't associated with a specific test - SingletonContainers.MinIO.init(); + MinIO.init(); } @Override public Builder s3Instructions(final Builder builder) { - return SingletonContainers.MinIO.s3Instructions(builder); + return MinIO.s3Instructions(builder); } @Override public S3AsyncClient s3AsyncClient() { - return SingletonContainers.MinIO.s3AsyncClient(); + return MinIO.s3AsyncClient(); } @Override public Map s3Properties() { - return SingletonContainers.MinIO.s3Properties(); + return Map.of( + ENDPOINT, MinIO.s3Endpoint(), + CLIENT_REGION, MinIO.region(), + ACCESS_KEY_ID, MinIO.accessKey(), + SECRET_ACCESS_KEY, MinIO.secretAccessKey()); } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetLocalStackTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetLocalStackTest.java index 796776eda1b..0b0e8775376 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetLocalStackTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetLocalStackTest.java @@ -5,7 +5,6 @@ import io.deephaven.extensions.s3.S3Instructions.Builder; import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack; -import io.deephaven.extensions.s3.testlib.SingletonContainers; import org.junit.BeforeClass; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -22,8 +21,28 @@ public Builder s3Instructions(Builder builder) { return LocalStack.s3Instructions(builder); } + @Override + public String s3Endpoint() { + return LocalStack.s3Endpoint(); + } + + @Override + public String region() { + return LocalStack.region(); + } + + @Override + public String accessKey() { + return LocalStack.accessKey(); + } + + @Override + public String secretAccessKey() { + return LocalStack.secretAccessKey(); + } + @Override public S3AsyncClient s3AsyncClient() { - return SingletonContainers.LocalStack.s3AsyncClient(); + return LocalStack.s3AsyncClient(); } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetMinIOTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetMinIOTest.java index 2b3cb60e609..a100f830e5d 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetMinIOTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetMinIOTest.java @@ -5,7 +5,6 @@ import io.deephaven.extensions.s3.S3Instructions.Builder; import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO; -import io.deephaven.extensions.s3.testlib.SingletonContainers; import io.deephaven.stats.util.OSUtil; import org.junit.Assume; import org.junit.BeforeClass; @@ -26,8 +25,28 @@ public Builder s3Instructions(final Builder builder) { return MinIO.s3Instructions(builder); } + @Override + public String s3Endpoint() { + return MinIO.s3Endpoint(); + } + + @Override + public String region() { + return MinIO.region(); + } + + @Override + public String accessKey() { + return MinIO.accessKey(); + } + + @Override + public String secretAccessKey() { + return MinIO.secretAccessKey(); + } + @Override public S3AsyncClient s3AsyncClient() { - return SingletonContainers.MinIO.s3AsyncClient(); + return MinIO.s3AsyncClient(); } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java index 8700dce1e36..afb1c44e790 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java @@ -13,6 +13,7 @@ import io.deephaven.engine.table.impl.select.FormulaEvaluationException; import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.TableTools; +import io.deephaven.extensions.s3.Credentials; import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup; import io.deephaven.test.types.OutOfBandTest; @@ -28,6 +29,8 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -51,6 +54,14 @@ abstract class S3ParquetTestBase extends S3SeekableChannelTestSetup { @Rule public final EngineCleanup framework = new EngineCleanup(); + public abstract String s3Endpoint(); + + public abstract String region(); + + public abstract String accessKey(); + + public abstract String secretAccessKey(); + @Before public void setUp() throws ExecutionException, InterruptedException, TimeoutException { super.doSetUp(); @@ -505,4 +516,79 @@ public void indexByLongKey() { verifyIndexingInfoExists(fromS3, "someInt", "someLong"); verifyIndexingInfoExists(fromS3, "someLong", "someInt"); } + + @Test + public void testReadWriteUsingProfile() throws IOException { + final Table table = TableTools.emptyTable(5).update("someIntColumn = (int) i"); + Path tempConfigFile = null; + Path tempCredentialsFile = null; + try { + // Create temporary config and credentials file and write wrong credentials to them + tempConfigFile = Files.createTempFile("config", ".tmp"); + final String configData = "[profile test-user]\nregion = wrong-region"; + Files.write(tempConfigFile, configData.getBytes()); + + tempCredentialsFile = Files.createTempFile("credentials", ".tmp"); + final String credentialsData = "[test-user]\naws_access_key_id = foo\naws_secret_access_key = bar"; + Files.write(tempCredentialsFile, credentialsData.getBytes()); + + final S3Instructions s3Instructions = S3Instructions.builder() + .readTimeout(Duration.ofSeconds(3)) + .endpointOverride(s3Endpoint()) + .profileName("test-user") + .credentialsFilePath(tempCredentialsFile.toString()) + .configFilePath(tempConfigFile.toString()) + .credentials(Credentials.profile()) + .build(); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions) + .build(); + try { + final URI uri = uri("table1.parquet"); + ParquetTools.writeTable(table, uri.toString(), instructions); + fail("Expected exception"); + } catch (final UncheckedDeephavenException expected) { + } + } finally { + // Delete the temporary files + if (tempConfigFile != null) { + Files.deleteIfExists(tempConfigFile); + } + if (tempCredentialsFile != null) { + Files.delete(tempCredentialsFile); + } + } + + try { + // Create temporary config and credentials file and write correct credentials and region to them + tempConfigFile = Files.createTempFile("config", ".tmp"); + final String configData = "[profile test-user]\nregion = " + region(); + Files.write(tempConfigFile, configData.getBytes()); + + tempCredentialsFile = Files.createTempFile("credentials", ".tmp"); + final String credentialsData = "[test-user]\naws_access_key_id = " + accessKey() + + "\naws_secret_access_key = " + secretAccessKey(); + Files.write(tempCredentialsFile, credentialsData.getBytes()); + + final S3Instructions s3Instructions = S3Instructions.builder() + .readTimeout(Duration.ofSeconds(3)) + .endpointOverride(s3Endpoint()) + .profileName("test-user") + .credentialsFilePath(tempCredentialsFile.toString()) + .configFilePath(tempConfigFile.toString()) + .credentials(Credentials.profile()) + .build(); + final ParquetInstructions instructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions) + .build(); + final URI uri = uri("table2.parquet"); + ParquetTools.writeTable(table, uri.toString(), instructions); + final Table fromS3 = ParquetTools.readTable(uri.toString(), instructions); + assertTableEquals(table, fromS3); + } finally { + // Delete the temporary files + Files.delete(tempConfigFile); + Files.delete(tempCredentialsFile); + } + } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/AnonymousCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/AnonymousCredentials.java index b637a61f770..d88ff59996e 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/AnonymousCredentials.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/AnonymousCredentials.java @@ -3,6 +3,7 @@ // package io.deephaven.extensions.s3; +import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -10,7 +11,7 @@ enum AnonymousCredentials implements AwsSdkV2Credentials { ANONYMOUS_CREDENTIALS; @Override - public AwsCredentialsProvider awsV2CredentialsProvider() { + public AwsCredentialsProvider awsV2CredentialsProvider(@NotNull final S3Instructions instructions) { return AnonymousCredentialsProvider.create(); } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsSdkV2Credentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsSdkV2Credentials.java index c7936e0daa2..6dfef882612 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsSdkV2Credentials.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsSdkV2Credentials.java @@ -3,9 +3,14 @@ // package io.deephaven.extensions.s3; +import io.deephaven.util.annotations.InternalUseOnly; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -interface AwsSdkV2Credentials extends Credentials { +@InternalUseOnly +public interface AwsSdkV2Credentials extends Credentials { - AwsCredentialsProvider awsV2CredentialsProvider(); + /** + * Get the AWS credentials provider based on the given instructions. + */ + AwsCredentialsProvider awsV2CredentialsProvider(S3Instructions instructions); } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/BasicCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/BasicCredentials.java index b058426987b..880cfd39278 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/BasicCredentials.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/BasicCredentials.java @@ -6,6 +6,7 @@ import io.deephaven.annotations.SimpleStyle; import org.immutables.value.Value; import org.immutables.value.Value.Immutable; +import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; @@ -29,7 +30,7 @@ static BasicCredentials of(final String accessKeyId, final String secretAccessKe abstract String secretAccessKey(); @Override - public final AwsCredentialsProvider awsV2CredentialsProvider() { + public final AwsCredentialsProvider awsV2CredentialsProvider(@NotNull final S3Instructions instructions) { return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId(), secretAccessKey())); } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java index 4cbcbc20498..a8cb8230262 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java @@ -3,18 +3,37 @@ // package io.deephaven.extensions.s3; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; public interface Credentials { /** - * Default credentials provider that looks for credentials at a number of locations as described in - * {@link DefaultCredentialsProvider} and falls back to anonymous credentials if no credentials are found. + * Default credentials provider used by Deephaven which resolves credentials in the following order: + *
    + *
  1. If a profile name, config file path, or credentials file path is provided, use + * {@link ProfileCredentialsProvider}
  2. + *
  3. If not, check all places mentioned in {@link DefaultCredentialsProvider} and fall back to + * {@link AnonymousCredentialsProvider}
  4. + *
+ * + * @see ProfileCredentialsProvider + * @see DefaultCredentialsProvider + * @see AnonymousCredentialsProvider + */ + static Credentials resolving() { + return ResolvingCredentials.INSTANCE; + } + + /** + * Default credentials provider used by the AWS SDK that looks for credentials at a number of locations as described + * in {@link DefaultCredentialsProvider} * * @see DefaultCredentialsProvider */ static Credentials defaultCredentials() { - return DefaultCredentials.DEFAULT_CREDENTIALS; + return DefaultCredentials.INSTANCE; } /** @@ -33,4 +52,13 @@ static Credentials basic(final String accessKeyId, final String secretAccessKey) static Credentials anonymous() { return AnonymousCredentials.ANONYMOUS_CREDENTIALS; } + + /** + * Profile specific credentials that uses configuration and credentials files. + * + * @see ProfileCredentialsProvider + */ + static Credentials profile() { + return ProfileCredentials.INSTANCE; + } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java index e3f506c975b..07f8e7c40dc 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java @@ -3,36 +3,19 @@ // package io.deephaven.extensions.s3; -import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsCredentials; +import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; /** - * Default AWS credentials provider that looks for credentials at a number of locations as described in - * {@link DefaultCredentialsProvider} and falls back to anonymous credentials if no credentials are found. - * - * @see DefaultCredentialsProvider + * Default credentials provider used by AWS SDK that looks for credentials at a number of locations as described in + * {@link DefaultCredentialsProvider} */ -enum DefaultCredentials implements AwsSdkV2Credentials, AwsCredentialsProvider { - DEFAULT_CREDENTIALS; - - private static final AwsCredentialsProviderChain PROVIDER_CHAIN = AwsCredentialsProviderChain.builder() - .reuseLastProviderEnabled(true) - .credentialsProviders(new AwsCredentialsProvider[] { - DefaultCredentialsProvider.create(), - AnonymousCredentialsProvider.create() - }) - .build(); - - @Override - public final AwsCredentialsProvider awsV2CredentialsProvider() { - return DEFAULT_CREDENTIALS; - } +enum DefaultCredentials implements AwsSdkV2Credentials { + INSTANCE; @Override - public AwsCredentials resolveCredentials() { - return PROVIDER_CHAIN.resolveCredentials(); + public final AwsCredentialsProvider awsV2CredentialsProvider(@NotNull final S3Instructions instructions) { + return DefaultCredentialsProvider.create(); } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/ProfileCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/ProfileCredentials.java new file mode 100644 index 00000000000..c74e5cabe57 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/ProfileCredentials.java @@ -0,0 +1,25 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import org.jetbrains.annotations.NotNull; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; + +/** + * Profile specific credentials that uses a profile file. + * + * @see ProfileCredentialsProvider + */ +enum ProfileCredentials implements AwsSdkV2Credentials { + INSTANCE; + + @Override + public final AwsCredentialsProvider awsV2CredentialsProvider(@NotNull final S3Instructions instructions) { + final ProfileCredentialsProvider.Builder builder = ProfileCredentialsProvider.builder(); + instructions.profileName().ifPresent(builder::profileName); + instructions.aggregatedProfileFile().ifPresent(builder::profileFile); + return builder.build(); + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/ResolvingCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/ResolvingCredentials.java new file mode 100644 index 00000000000..b65c4ca9f14 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/ResolvingCredentials.java @@ -0,0 +1,45 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import org.jetbrains.annotations.NotNull; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; + +/** + * Default credentials provider used by Deephaven which resolves credentials in the following order: + *
    + *
  1. If a profile name, config file path, or credentials file path is provided, use + * {@link ProfileCredentialsProvider}
  2. + *
  3. If not, check all places mentioned in {@link DefaultCredentialsProvider} and fall back to + * {@link AnonymousCredentialsProvider}
  4. + *
+ * + * @see ProfileCredentialsProvider + * @see DefaultCredentialsProvider + * @see AnonymousCredentialsProvider + */ +enum ResolvingCredentials implements AwsSdkV2Credentials { + INSTANCE; + + private static final AwsCredentialsProviderChain PROVIDER_CHAIN = AwsCredentialsProviderChain.builder() + .credentialsProviders( + DefaultCredentialsProvider.create(), + AnonymousCredentialsProvider.create()) + .reuseLastProviderEnabled(false) // Don't cache because this chain is a shared static instance + .build(); + + @Override + public final AwsCredentialsProvider awsV2CredentialsProvider(@NotNull final S3Instructions instructions) { + if (instructions.profileName().isPresent() + || instructions.configFilePath().isPresent() + || instructions.credentialsFilePath().isPresent()) { + return ProfileCredentials.INSTANCE.awsV2CredentialsProvider(instructions); + } + return PROVIDER_CHAIN; + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java index e029dff0e5e..50be6e05958 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java @@ -64,18 +64,22 @@ private static S3AsyncClientBuilder getAsyncClientBuilder(@NotNull final S3Instr b -> b.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, ensureAsyncFutureCompletionExecutor())) .httpClient(getOrBuildHttpAsyncClient(instructions)) - .overrideConfiguration(ClientOverrideConfiguration.builder() - // If we find that the STANDARD retry policy does not work well in all situations, we might - // try experimenting with ADAPTIVE retry policy, potentially with fast fail. - // .retryPolicy(RetryPolicy.builder(RetryMode.ADAPTIVE).fastFailRateLimiting(true).build()) - .retryPolicy(RetryMode.STANDARD) - .apiCallAttemptTimeout(instructions.readTimeout().dividedBy(3)) - .apiCallTimeout(instructions.readTimeout()) - // Adding a metrics publisher may be useful for debugging, but it's very verbose. - // .addMetricPublisher(LoggingMetricPublisher.create(Level.INFO, Format.PRETTY)) - .scheduledExecutorService(ensureScheduledExecutor()) - .build()) .credentialsProvider(instructions.awsV2CredentialsProvider()); + + final ClientOverrideConfiguration.Builder overrideConfiguration = ClientOverrideConfiguration.builder() + // If we find that the STANDARD retry policy does not work well in all situations, we might + // try experimenting with ADAPTIVE retry policy, potentially with fast fail. + // .retryPolicy(RetryPolicy.builder(RetryMode.ADAPTIVE).fastFailRateLimiting(true).build()) + .retryPolicy(RetryMode.STANDARD) + .apiCallAttemptTimeout(instructions.readTimeout().dividedBy(3)) + .apiCallTimeout(instructions.readTimeout()) + // Adding a metrics publisher may be useful for debugging, but it's very verbose. + // .addMetricPublisher(LoggingMetricPublisher.create(Level.INFO, Format.PRETTY)) + .scheduledExecutorService(ensureScheduledExecutor()); + instructions.profileName().ifPresent(overrideConfiguration::defaultProfileName); + instructions.aggregatedProfileFile().ifPresent(overrideConfiguration::defaultProfileFile); + builder.overrideConfiguration(overrideConfiguration.build()); + if (instructions.regionName().isPresent()) { builder.region(Region.of(instructions.regionName().get())); } else { diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3CompletableOutputStream.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3CompletableOutputStream.java index 43004a6ba70..af74d3cf3ae 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3CompletableOutputStream.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3CompletableOutputStream.java @@ -344,8 +344,8 @@ private void completeMultipartUpload() throws IOException { */ private void abortMultipartUpload() throws IOException { if (uploadId == null) { - throw new IllegalStateException("Cannot abort multipart upload for uri " + uri + " because upload ID " + - "is null"); + // We didn't start the upload, so nothing to abort + return; } final AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder() .bucket(uri.bucket().orElseThrow()) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java index e6383f465ec..bba7502e855 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java @@ -12,8 +12,11 @@ import org.immutables.value.Value.Immutable; import org.immutables.value.Value.Lazy; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.profiles.ProfileFile; import java.net.URI; +import java.nio.file.Path; import java.time.Duration; import java.util.Optional; @@ -77,9 +80,9 @@ public int readAheadCount() { } /** - * The maximum byte size of each fragment to read from S3, defaults to {@value DEFAULT_FRAGMENT_SIZE}, must be - * larger than {@value MIN_FRAGMENT_SIZE}. If there are fewer bytes remaining in the file, the fetched fragment can - * be smaller. + * The maximum byte size of each fragment to read from S3 in bytes, defaults to {@value DEFAULT_FRAGMENT_SIZE}, must + * be larger than {@value MIN_FRAGMENT_SIZE}. If there are fewer bytes remaining in the file, the fetched fragment + * can be smaller. */ @Default public int fragmentSize() { @@ -106,11 +109,11 @@ public Duration readTimeout() { } /** - * The credentials to use when reading or writing to S3. By default, uses {@link Credentials#defaultCredentials()}. + * The credentials to use when reading or writing to S3. By default, uses {@link Credentials#resolving()}. */ @Default public Credentials credentials() { - return Credentials.defaultCredentials(); + return Credentials.resolving(); } /** @@ -135,6 +138,53 @@ public int numConcurrentWriteParts() { return DEFAULT_NUM_CONCURRENT_WRITE_PARTS; } + /** + * The default profile name used for configuring the default region, credentials, etc., when reading or writing to + * S3. If not provided, the AWS SDK picks the profile name from the 'aws.profile' system property, the "AWS_PROFILE" + * environment variable, or defaults to "default". + *

+ * Setting a profile name assumes that the credentials are provided via this profile; if that is not the case, you + * must explicitly set {@link #credentials() credentials}. + * + * @see ClientOverrideConfiguration.Builder#defaultProfileName(String) + */ + public abstract Optional profileName(); + + /** + * The path to the configuration file to use for configuring the default region, credentials, etc. when reading or + * writing to S3. If not provided, the AWS SDK picks the configuration file from the 'aws.configFile' system + * property, the "AWS_CONFIG_FILE" environment variable, or defaults to "{user.home}/.aws/config". + *

+ * Setting a configuration file path assumes that the credentials are provided via the configuration and credentials + * files; if that is not the case, you must explicitly set {@link #credentials() credentials}. + * + * @see ClientOverrideConfiguration.Builder#defaultProfileFile(ProfileFile) + */ + public abstract Optional configFilePath(); + + /** + * The path to the credentials file to use for configuring the default region, credentials, etc. when reading or + * writing to S3. If not provided, the AWS SDK picks the credentials file from the 'aws.credentialsFile' system + * property, the "AWS_CREDENTIALS_FILE" environment variable, or defaults to "{user.home}/.aws/credentials". + *

+ * Setting a credentials file path assumes that the credentials are provided via the config and credentials files; + * if that is not the case, you must explicitly set {@link #credentials() credentials}. + * + * @see ClientOverrideConfiguration.Builder#defaultProfileFile(ProfileFile) + */ + public abstract Optional credentialsFilePath(); + + /** + * The aggregated profile file that combines the configuration and credentials files. + */ + @Lazy + Optional aggregatedProfileFile() { + if (configFilePath().isPresent() || credentialsFilePath().isPresent()) { + return Optional.of(S3Utils.aggregateProfileFile(configFilePath(), credentialsFilePath())); + } + return Optional.empty(); + } + @Override public LogOutput append(final LogOutput logOutput) { return logOutput.append(toString()); @@ -171,10 +221,24 @@ public interface Builder { Builder numConcurrentWriteParts(int numConcurrentWriteParts); - default Builder endpointOverride(String endpointOverride) { + Builder profileName(String profileName); + + Builder configFilePath(Path configFilePath); + + Builder credentialsFilePath(Path credentialsFilePath); + + default Builder endpointOverride(final String endpointOverride) { return endpointOverride(URI.create(endpointOverride)); } + default Builder configFilePath(final String configFilePath) { + return configFilePath(Path.of(configFilePath)); + } + + default Builder credentialsFilePath(final String credentialsFilePath) { + return credentialsFilePath(Path.of(credentialsFilePath)); + } + S3Instructions build(); } @@ -245,7 +309,7 @@ final void boundsCheckMaxNumConcurrentWriteParts() { } final AwsCredentialsProvider awsV2CredentialsProvider() { - return ((AwsSdkV2Credentials) credentials()).awsV2CredentialsProvider(); + return ((AwsSdkV2Credentials) credentials()).awsV2CredentialsProvider(this); } // If necessary, we _could_ plumb support for "S3-compatible" services which don't support virtual-host style diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Utils.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Utils.java new file mode 100644 index 00000000000..1d0b95a4e00 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Utils.java @@ -0,0 +1,51 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import org.jetbrains.annotations.NotNull; +import software.amazon.awssdk.profiles.ProfileFile; +import software.amazon.awssdk.profiles.ProfileFileLocation; + +import java.nio.file.Path; +import java.util.Optional; + +class S3Utils { + + /** + * Aggregates the profile files for configuration and credentials files into a single {@link ProfileFile}. + * + * @param configFilePath An {@link Optional} containing the path to the configuration file. If empty, the aws sdk + * default location is used. + * @param credentialsFilePath An {@link Optional} containing the path to the credentials file, If empty, the aws sdk + * default location is used. + * + * @return A {@link ProfileFile} that aggregates the configuration and credentials files. + */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + static ProfileFile aggregateProfileFile( + @NotNull final Optional configFilePath, + @NotNull final Optional credentialsFilePath) { + final ProfileFile.Aggregator builder = ProfileFile.aggregator(); + + // Add the credentials file + credentialsFilePath.or(ProfileFileLocation::credentialsFileLocation) + .ifPresent(path -> addProfileFile(builder, ProfileFile.Type.CREDENTIALS, path)); + + // Add the configuration file + configFilePath.or(ProfileFileLocation::configurationFileLocation) + .ifPresent(path -> addProfileFile(builder, ProfileFile.Type.CONFIGURATION, path)); + + return builder.build(); + } + + private static void addProfileFile( + @NotNull final ProfileFile.Aggregator builder, + @NotNull final ProfileFile.Type type, + @NotNull final Path path) { + builder.addFile(ProfileFile.builder() + .type(type) + .content(path) + .build()); + } +} diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java index 4d6ef35ce4a..f2f524faae8 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java @@ -4,11 +4,16 @@ package io.deephaven.extensions.s3; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.profiles.ProfileFile; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.Optional; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.fail; public class S3InstructionsTest { @@ -21,10 +26,14 @@ void defaults() { assertThat(instructions.fragmentSize()).isEqualTo(65536); assertThat(instructions.connectionTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(instructions.readTimeout()).isEqualTo(Duration.ofSeconds(2)); - assertThat(instructions.credentials()).isEqualTo(Credentials.defaultCredentials()); + assertThat(instructions.credentials()).isEqualTo(Credentials.resolving()); assertThat(instructions.writePartSize()).isEqualTo(10485760); assertThat(instructions.numConcurrentWriteParts()).isEqualTo(64); assertThat(instructions.endpointOverride()).isEmpty(); + assertThat(instructions.profileName()).isEmpty(); + assertThat(instructions.configFilePath()).isEmpty(); + assertThat(instructions.credentialsFilePath()).isEmpty(); + assertThat(instructions.aggregatedProfileFile()).isEmpty(); } @Test @@ -54,6 +63,7 @@ void testMinMaxConcurrentRequests() { .regionName("some-region") .maxConcurrentRequests(-1) .build(); + fail("Expected exception"); } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("maxConcurrentRequests"); } @@ -66,6 +76,7 @@ void tooSmallMaxConcurrentRequests() { .regionName("some-region") .maxConcurrentRequests(0) .build(); + fail("Expected exception"); } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("maxConcurrentRequests"); } @@ -88,6 +99,7 @@ void tooSmallReadAheadCount() { .regionName("some-region") .readAheadCount(-1) .build(); + fail("Expected exception"); } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("readAheadCount"); } @@ -110,6 +122,7 @@ void tooSmallFragmentSize() { .regionName("some-region") .fragmentSize(8 * (1 << 10) - 1) .build(); + fail("Expected exception"); } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("fragmentSize"); } @@ -132,6 +145,7 @@ void badCredentials() { .regionName("some-region") .credentials(new Credentials() {}) .build(); + fail("Expected exception"); } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("credentials"); } @@ -144,6 +158,7 @@ void tooSmallWritePartSize() { .regionName("some-region") .writePartSize(1024) .build(); + fail("Expected exception"); } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("writePartSize"); } @@ -156,6 +171,7 @@ void tooSmallNumConcurrentWriteParts() { .regionName("some-region") .numConcurrentWriteParts(0) .build(); + fail("Expected exception"); } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("numConcurrentWriteParts"); } @@ -169,8 +185,76 @@ void tooLargeNumConcurrentWriteParts() { .numConcurrentWriteParts(1001) .maxConcurrentRequests(1000) .build(); + fail("Expected exception"); } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("numConcurrentWriteParts"); } } + + @Test + void testSetProfileName() { + final Optional profileName = S3Instructions.builder() + .regionName("some-region") + .profileName("some-profile") + .build() + .profileName(); + assertThat(profileName.isPresent()).isTrue(); + assertThat(profileName.get()).isEqualTo("some-profile"); + } + + @Test + void testSetConfigFilePath() throws IOException { + // Create temporary config and credentials file and write some data to them + final Path tempConfigFile = Files.createTempFile("config", ".tmp"); + final String configData = "[default]\nregion = us-east-1\n\n[profile test-user]\nregion = us-east-2"; + Files.write(tempConfigFile, configData.getBytes()); + + final Path tempCredentialsFile = Files.createTempFile("credentials", ".tmp"); + final String credentialsData = "[default]\naws_access_key_id = foo\naws_secret_access_key = bar"; + Files.write(tempCredentialsFile, credentialsData.getBytes()); + + try { + final Optional ret = S3Instructions.builder() + .configFilePath(tempConfigFile.toString()) + .credentialsFilePath(tempCredentialsFile.toString()) + .build() + .aggregatedProfileFile(); + assertThat(ret.isPresent()).isTrue(); + final ProfileFile profileFile = ret.get(); + assertThat(profileFile.profiles().size()).isEqualTo(2); + assertThat(profileFile.profile("default").get().properties().get("region")).isEqualTo("us-east-1"); + assertThat(profileFile.profile("default").get().properties().get("aws_access_key_id")).isEqualTo("foo"); + assertThat(profileFile.profile("default").get().properties().get("aws_secret_access_key")).isEqualTo("bar"); + assertThat(profileFile.profile("test-user").get().properties().get("region")).isEqualTo("us-east-2"); + } finally { + Files.delete(tempConfigFile); + Files.delete(tempCredentialsFile); + } + } + + @Test + void testBadConfigFilePath() { + try { + S3Instructions.builder() + .configFilePath("/some/random/path") + .build() + .aggregatedProfileFile(); + fail("Expected exception"); + } catch (IllegalStateException e) { + assertThat(e).hasMessageContaining("/some/random/path"); + } + } + + @Test + void testBadCredentialsFilePath() { + try { + S3Instructions.builder() + .credentialsFilePath("/some/random/path") + .build() + .aggregatedProfileFile(); + fail("Expected exception"); + } catch (IllegalStateException e) { + assertThat(e).hasMessageContaining("/some/random/path"); + } + } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java index 5d5b550089d..004f34558cd 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java @@ -55,12 +55,20 @@ public static S3AsyncClient s3AsyncClient() { .build(); } - public static Map s3Properties() { - return Map.of( - "s3.endpoint", LOCALSTACK_S3.getEndpoint().toString(), - "client.region", LOCALSTACK_S3.getRegion(), - "s3.access-key-id", LOCALSTACK_S3.getAccessKey(), - "s3.secret-access-key", LOCALSTACK_S3.getSecretKey()); + public static String s3Endpoint() { + return LOCALSTACK_S3.getEndpoint().toString(); + } + + public static String region() { + return LOCALSTACK_S3.getRegion(); + } + + public static String accessKey() { + return LOCALSTACK_S3.getAccessKey(); + } + + public static String secretAccessKey() { + return LOCALSTACK_S3.getSecretKey(); } } @@ -97,12 +105,20 @@ public static S3AsyncClient s3AsyncClient() { .build(); } - public static Map s3Properties() { - return Map.of( - "s3.endpoint", MINIO.getS3URL(), - "client.region", Region.AWS_GLOBAL.toString(), - "s3.access-key-id", MINIO.getUserName(), - "s3.secret-access-key", MINIO.getPassword()); + public static String s3Endpoint() { + return MINIO.getS3URL(); + } + + public static String region() { + return Region.AWS_GLOBAL.toString(); + } + + public static String accessKey() { + return MINIO.getUserName(); + } + + public static String secretAccessKey() { + return MINIO.getPassword(); } } } diff --git a/py/server/deephaven/experimental/s3.py b/py/server/deephaven/experimental/s3.py index f11e311cd78..47426533335 100644 --- a/py/server/deephaven/experimental/s3.py +++ b/py/server/deephaven/experimental/s3.py @@ -2,6 +2,7 @@ # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # from typing import Optional, Union +from warnings import warn import jpy @@ -23,6 +24,94 @@ included in the package. This is an opt-out functionality included by default. If not included, importing this module will fail to find the java types. """ + +class Credentials(JObjectWrapper): + """ + Credentials object for authenticating with an S3 server. + """ + j_object_type = _JCredentials + + def __init__(self, _j_object: jpy.JType): + """ + Initializes the credentials object. + + Args: + _j_object (Credentials): the Java credentials object. + """ + self._j_object = _j_object + + @property + def j_object(self) -> jpy.JType: + return self._j_object + + @classmethod + def resolving(cls) -> 'Credentials': + """ + Default credentials provider used by Deephaven which resolves credentials in the following order: + + 1. If a profile name, config file path, or credentials file path is provided via S3 Instructions, use the + profile_name for loading the credentials from the config and credentials file and fail if none is found. + + 2. Otherwise, use the default AWS SDK behavior that looks for credentials in this order: Java System Properties + (`aws.accessKeyId` and `aws.secretAccessKey`), Environment Variables (`AWS_ACCESS_KEY_ID` and + `AWS_SECRET_ACCESS_KEY`), Credential profiles file at the default location (~/.aws/credentials), or Instance + profile credentials delivered through the Amazon EC2 metadata service. If still none found, fall back to + anonymous credentials, which can only be used to read data with S3 policy set to allow anonymous access. + + Returns: + Credentials: the credentials object. + """ + return cls(_JCredentials.resolving()) + + @classmethod + def default(cls) -> 'Credentials': + """ + Default credentials provider used by the AWS SDK that looks for credentials in this order: + Java System Properties (`aws.accessKeyId` and `aws.secretAccessKey`), Environment Variables (`AWS_ACCESS_KEY_ID` + and `AWS_SECRET_ACCESS_KEY`), Credential profiles file at the default location (~/.aws/credentials), and + Instance profile credentials delivered through the Amazon EC2 metadata service. + + Returns: + Credentials: the credentials object. + """ + return cls(_JCredentials.defaultCredentials()) + + @classmethod + def basic(cls, access_key_id: str, secret_access_key: str) -> 'Credentials': + """ + Basic credentials provider with the specified access key id and secret access key. + + Args: + access_key_id (str): the access key id, used to identify the user. + secret_access_key (str): the secret access key, used to authenticate the user. + + Returns: + Credentials: the credentials object. + """ + return cls(_JCredentials.basic(access_key_id, secret_access_key)) + + @classmethod + def anonymous(cls) -> 'Credentials': + """ + Anonymous credentials provider, which can only be used to read data with S3 policy set to allow anonymous access. + + Returns: + Credentials: the credentials object. + """ + return cls(_JCredentials.anonymous()) + + @classmethod + def profile(cls) -> 'Credentials': + """ + Use the profile name, config file path, or credentials file path from S3 Instructions for loading the + credentials and fail if none found. + + Returns: + Credentials: the credentials object. + """ + return cls(_JCredentials.profile()) + + class S3Instructions(JObjectWrapper): """ S3Instructions provides specialized instructions for reading from and writing to S3-compatible APIs. @@ -32,6 +121,7 @@ class S3Instructions(JObjectWrapper): def __init__(self, region_name: Optional[str] = None, + credentials: Optional[Credentials] = None, max_concurrent_requests: Optional[int] = None, read_ahead_count: Optional[int] = None, fragment_size: Optional[int] = None, @@ -42,7 +132,10 @@ def __init__(self, anonymous_access: bool = False, endpoint_override: Optional[str] = None, write_part_size: Optional[int] = None, - num_concurrent_write_parts: Optional[int] = None): + num_concurrent_write_parts: Optional[int] = None, + profile_name: Optional[str] = None, + config_file_path: Optional[str] = None, + credentials_file_path: Optional[str] = None): """ Initializes the instructions. @@ -54,36 +147,67 @@ def __init__(self, in EC2. If no region name is derived from the above chain or the derived region name is incorrect for the bucket accessed, the correct region name will be derived internally, at the cost of one additional request. + credentials (Credentials): the credentials object for authenticating to the S3 server, defaults to + Credentials.resolving(). max_concurrent_requests (int): the maximum number of concurrent requests for reading files, default is 256. read_ahead_count (int): the number of fragments to send asynchronous read requests for while reading the current fragment. Defaults to 32, which means fetch the next 32 fragments in advance when reading the current fragment. - fragment_size (int): the maximum size of each fragment to read, defaults to 64 KiB. If there are fewer bytes - remaining in the file, the fetched fragment can be smaller. - connection_timeout (DurationLike): - the amount of time to wait when initially establishing a connection before giving up and timing out, can - be expressed as an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or - other time duration types. Default to 2 seconds. - read_timeout (DurationLike): - the amount of time to wait when reading a fragment before giving up and timing out, can be expressed as - an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or other time - duration types. Default to 2 seconds. - access_key_id (str): the access key for reading files. Both access key and secret access key must be - provided to use static credentials, else default credentials will be used. - secret_access_key (str): the secret access key for reading files. Both access key and secret key must be - provided to use static credentials, else default credentials will be used. - anonymous_access (bool): use anonymous credentials, this is useful when the S3 policy has been set to allow - anonymous access. Can't be combined with other credentials. By default, is False. + fragment_size (int): the maximum size of each fragment to read in bytes, defaults to 65536. If + there are fewer bytes remaining in the file, the fetched fragment can be smaller. + connection_timeout (DurationLike): the amount of time to wait when initially establishing a connection + before giving up and timing out. Can be expressed as an integer in nanoseconds, a time interval string, + e.g. "PT00:00:00.001" or "PT1s", or other time duration types. Default to 2 seconds. + read_timeout (DurationLike): the amount of time to wait when reading a fragment before giving up and timing + out. Can be expressed as an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or + "PT1s", or other time duration types. Default to 2 seconds. + access_key_id (str): (Deprecated) the access key for reading files. Both access key and secret access key + must be provided to use static credentials. If you specify both access key and secret key, then you + cannot provide other credentials like setting anonymous_access or credentials argument. + This option is deprecated and should be replaced by setting credentials as + Credentials.basic(access_key_id, secret_access_key). + secret_access_key (str): (Deprecated) the secret access key for reading files. Both access key and secret + key must be provided to use static credentials. If you specify both access key and secret key, then you + cannot provide other credentials like setting anonymous_access or credentials argument. + This option is deprecated and should be replaced by setting credentials as + Credentials.basic(access_key_id, secret_access_key). + anonymous_access (bool): (Deprecated) use anonymous credentials, this is useful when the S3 policy has been + set to allow anonymous access. By default, is False. If you set this to True, you cannot provide other + credentials like setting access_key_id or credentials argument. + This option is deprecated and should be replaced by setting credentials as Credentials.anonymous(). endpoint_override (str): the endpoint to connect to. Callers connecting to AWS do not typically need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs. - write_part_size (int): Writes to S3 are done in parts or chunks, and this value determines the size of each - part (in bytes). The default value is 10485760 (= 10 MiB) and minimum allowed part size is 5 MiB. - Setting a higher value may increase throughput, but may also increase memory usage. - Note that the maximum number of parts allowed for a single file is 10,000. Therefore, for 10 MiB part - size, the maximum size of a single file that can be written is roughly 100k MiB (or about 98 GiB). - num_concurrent_write_parts (int): the maximum number of parts that can be uploaded concurrently when writing - to S3 without blocking, defaults to 64. Setting a higher value may increase throughput, but may also - increase memory usage. + write_part_size (int): The part or chunk size when writing to S3. The default is 10 MiB. The minimum allowed + part size is 5242880. Higher part size may increase throughput but also increase memory usage. Writing + a single file to S3 can be done in a maximum of 10,000 parts, so the maximum size of a single file that + can be written is about 98 GiB for the default part size. + num_concurrent_write_parts (int): the maximum number of parts or chunks that can be uploaded concurrently + when writing to S3 without blocking, defaults to 64. Setting a higher value may increase throughput, but + may also increase memory usage. + profile_name (str): the profile name used for configuring the default region, credentials, etc., when + reading or writing to S3. If not provided, the AWS SDK picks the profile name from the 'aws.profile' + system property, the "AWS_PROFILE" environment variable, or defaults to the string "default". + Setting a profile name assumes that the credentials are provided via this profile; if that is not the + case, you must explicitly set credentials using the access_key_id and secret_access_key. + config_file_path (str): the path to the configuration file to use for configuring the default region, + role_arn, output etc. when reading or writing to S3. If not provided, the AWS SDK picks the configuration + file from the 'aws.configFile' system property, the "AWS_CONFIG_FILE" environment variable, or defaults + to "{user.home}/.aws/config". + Setting a configuration file path assumes that the credentials are provided via the config and + credentials files; if that is not the case, you must explicitly set credentials using the access_key_id + and secret_access_key. + For reference on the configuration file format, check + https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html + credentials_file_path (str): the path to the credentials file to use for configuring the credentials, + region, etc. when reading or writing to S3. If not provided, the AWS SDK picks the credentials file from + the 'aws.credentialsFile' system property, the "AWS_CREDENTIALS_FILE" environment variable, or defaults + to "{user.home}/.aws/credentials". + Setting a credentials file path assumes that the credentials are provided via the config and + credentials files; if that is not the case, you must explicitly set credentials using the access_key_id + and secret_access_key. + The main difference between config_file_path and credentials_file_path is around the conventions used + in the files. For reference on the credentials file format, check + https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html Raises: DHError: If unable to build the instructions object. @@ -118,12 +242,28 @@ def __init__(self, (access_key_id is None and secret_access_key is not None)): raise DHError("Either both access_key_id and secret_access_key must be provided or neither") + def throw_multiple_credentials_error(credentials1: str, credentials2: str): + raise DHError(f"Only one set of credentials can be set, but found {credentials1} and {credentials2}") + + # Configure the credentials if access_key_id is not None: + warn('access_key_id is deprecated, prefer setting credentials as ' + 'Credentials.basic(access_key_id, secret_access_key)', DeprecationWarning, stacklevel=2) + # TODO(deephaven-core#6165): Delete deprecated parameters if anonymous_access: - raise DHError("Only one set of credentials may be used, requested both key and anonymous") + throw_multiple_credentials_error("access_key_id", "anonymous_access") + if credentials is not None: + throw_multiple_credentials_error("access_key_id", "credentials") builder.credentials(_JCredentials.basic(access_key_id, secret_access_key)) elif anonymous_access: + warn("anonymous_access is deprecated, prefer setting credentials as Credentials.anonymous()", + DeprecationWarning, stacklevel=2) + # TODO(deephaven-core#6165): Delete deprecated parameters + if credentials is not None: + throw_multiple_credentials_error("anonymous_access", "credentials") builder.credentials(_JCredentials.anonymous()) + elif credentials is not None: + builder.credentials(credentials.j_object) if endpoint_override is not None: builder.endpointOverride(endpoint_override) @@ -134,6 +274,15 @@ def __init__(self, if num_concurrent_write_parts is not None: builder.numConcurrentWriteParts(num_concurrent_write_parts) + if profile_name is not None: + builder.profileName(profile_name) + + if config_file_path is not None: + builder.configFilePath(config_file_path) + + if credentials_file_path is not None: + builder.credentialsFilePath(credentials_file_path) + self._j_object = builder.build() except Exception as e: raise DHError(e, "Failed to build S3 instructions") from e diff --git a/py/server/tests/test_s3_instructions.py b/py/server/tests/test_s3_instructions.py new file mode 100644 index 00000000000..ad169914117 --- /dev/null +++ b/py/server/tests/test_s3_instructions.py @@ -0,0 +1,120 @@ +# +# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +# +import jpy +import tempfile + +from tests.testbase import BaseTestCase +from deephaven import DHError +from deephaven.experimental import s3 + +_JCredentials = jpy.get_type("io.deephaven.extensions.s3.Credentials") + +class S3InstructionTest(BaseTestCase): + """ Test cases for the s3 instructions """ + + def setUp(self): + super().setUp() + + def tearDown(self): + super().tearDown() + + def test_default(self): + s3_instructions = s3.S3Instructions() + self.assertTrue(s3_instructions.j_object is not None) + self.assertTrue(s3_instructions.j_object.regionName().isEmpty()) + self.assertTrue(s3_instructions.j_object.credentials().getClass() == _JCredentials.resolving().getClass()) + self.assertEqual(s3_instructions.j_object.maxConcurrentRequests(), 256) + self.assertEqual(s3_instructions.j_object.readAheadCount(), 32) + self.assertEqual(s3_instructions.j_object.fragmentSize(), 65536) + self.assertEqual(s3_instructions.j_object.connectionTimeout().toSeconds(), 2) + self.assertEqual(s3_instructions.j_object.readTimeout().toSeconds(), 2) + self.assertTrue(s3_instructions.j_object.endpointOverride().isEmpty()) + self.assertEqual(s3_instructions.j_object.writePartSize(), 10485760) + self.assertEqual(s3_instructions.j_object.numConcurrentWriteParts(), 64) + self.assertTrue(s3_instructions.j_object.profileName().isEmpty()) + self.assertTrue(s3_instructions.j_object.configFilePath().isEmpty()) + self.assertTrue(s3_instructions.j_object.credentialsFilePath().isEmpty()) + + def test_set_region_name(self): + s3_instructions = s3.S3Instructions(region_name="us-west-2") + self.assertEqual(s3_instructions.j_object.regionName().get(), "us-west-2") + + def test_set_max_concurrent_requests(self): + s3_instructions = s3.S3Instructions(max_concurrent_requests=512) + self.assertEqual(s3_instructions.j_object.maxConcurrentRequests(), 512) + + def test_set_read_ahead_count(self): + s3_instructions = s3.S3Instructions(read_ahead_count=64) + self.assertEqual(s3_instructions.j_object.readAheadCount(), 64) + + def test_set_fragment_size(self): + s3_instructions = s3.S3Instructions(fragment_size=131072) + self.assertEqual(s3_instructions.j_object.fragmentSize(), 131072) + + def test_set_connection_timeout(self): + s3_instructions = s3.S3Instructions(connection_timeout="PT10s") + self.assertEqual(s3_instructions.j_object.connectionTimeout().toSeconds(), 10) + + def test_set_read_timeout(self): + s3_instructions = s3.S3Instructions(read_timeout="PT5s") + self.assertEqual(s3_instructions.j_object.readTimeout().toSeconds(), 5) + + def test_set_endpoint_override(self): + s3_instructions = s3.S3Instructions(endpoint_override="http://localhost:9000") + self.assertEqual(s3_instructions.j_object.endpointOverride().get().toString(), "http://localhost:9000") + + def test_set_write_part_size(self): + s3_instructions = s3.S3Instructions(write_part_size=20971520) + self.assertEqual(s3_instructions.j_object.writePartSize(), 20971520) + + def test_set_num_concurrent_write_parts(self): + s3_instructions = s3.S3Instructions(num_concurrent_write_parts=128) + self.assertEqual(s3_instructions.j_object.numConcurrentWriteParts(), 128) + + def test_set_profile_name(self): + s3_instructions = s3.S3Instructions(profile_name="test-user") + self.assertEqual(s3_instructions.j_object.profileName().get(), "test-user") + + def test_set_config_file_path(self): + with tempfile.NamedTemporaryFile() as temp_config_file: + s3_instructions = s3.S3Instructions(config_file_path=temp_config_file.name) + self.assertEqual(s3_instructions.j_object.configFilePath().get().toString(), temp_config_file.name) + + def test_set_credentials_file_path(self): + with tempfile.NamedTemporaryFile() as temp_credentials_file: + s3_instructions = s3.S3Instructions(credentials_file_path=temp_credentials_file.name) + self.assertEqual(s3_instructions.j_object.credentialsFilePath().get().toString(), temp_credentials_file.name) + + def test_set_resolving_credentials(self): + s3_instructions = s3.S3Instructions(credentials=s3.Credentials.resolving()) + self.assertTrue(s3_instructions.j_object.credentials().getClass() == _JCredentials.resolving().getClass()) + + def test_set_anonymous_access(self): + s3_instructions = s3.S3Instructions(anonymous_access=True) + self.assertTrue(s3_instructions.j_object.credentials().getClass() == _JCredentials.anonymous().getClass()) + + s3_instructions = s3.S3Instructions(credentials=s3.Credentials.anonymous()) + self.assertTrue(s3_instructions.j_object.credentials().getClass() == _JCredentials.anonymous().getClass()) + + def test_set_default_credentials(self): + s3_instructions = s3.S3Instructions(credentials=s3.Credentials.default()) + self.assertTrue(s3_instructions.j_object.credentials().getClass() == _JCredentials.defaultCredentials().getClass()) + + def test_set_profile_credentials(self): + s3_instructions = s3.S3Instructions(credentials=s3.Credentials.profile()) + self.assertTrue(s3_instructions.j_object.credentials().getClass() == _JCredentials.profile().getClass()) + + def test_set_multiple_credentials(self): + # Only one set of credentials can be set + with self.assertRaises(DHError): + s3.S3Instructions(anonymous_access=True, access_key_id="foo", secret_access_key="bar") + self.fail("Expected ValueError") + + with self.assertRaises(DHError): + s3.S3Instructions(anonymous_access=True, credentials=s3.Credentials.resolving()) + self.fail("Expected ValueError") + + with self.assertRaises(DHError): + s3.S3Instructions(access_key_id="foo", secret_access_key="bar", credentials=s3.Credentials.resolving()) + self.fail("Expected ValueError")