diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java
index a59a259799f..e2c6638d5ce 100644
--- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java
+++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java
@@ -27,7 +27,7 @@
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
-import io.deephaven.extensions.s3.AwsCredentials;
+import io.deephaven.extensions.s3.Credentials;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
@@ -583,13 +583,13 @@ public void testArrayColumns() {
public void readSampleParquetFilesFromS3Test1() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
- .awsRegionName("us-east-1")
+ .regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
- .credentials(AwsCredentials.defaultCredentials())
+ .credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
@@ -621,7 +621,7 @@ public void readSampleParquetFilesFromS3Test1() {
public void readSampleParquetFilesFromS3Test2() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
- .awsRegionName("us-east-2")
+ .regionName("us-east-2")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
diff --git a/extensions/s3/build.gradle b/extensions/s3/build.gradle
index 45b356bb34c..a3d643c3a6c 100644
--- a/extensions/s3/build.gradle
+++ b/extensions/s3/build.gradle
@@ -18,6 +18,38 @@ dependencies {
compileOnly depAnnotations
+ // For OSUtil
+ testImplementation project(':Stats')
+
Classpaths.inheritAutoService(project)
Classpaths.inheritImmutables(project)
+
+ Classpaths.inheritJUnitPlatform(project)
+ Classpaths.inheritAssertJ(project)
+ testImplementation 'org.junit.jupiter:junit-jupiter'
+ testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
+
+ testImplementation "org.testcontainers:testcontainers:1.19.4"
+ testImplementation "org.testcontainers:junit-jupiter:1.19.4"
+ testImplementation "org.testcontainers:localstack:1.19.4"
+ testImplementation "org.testcontainers:minio:1.19.4"
+
+ testRuntimeOnly project(':test-configs')
+ testRuntimeOnly project(':log-to-slf4j')
+ Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')
}
+
+test {
+ useJUnitPlatform {
+ excludeTags("testcontainers")
+ }
+}
+
+tasks.register('testOutOfBand', Test) {
+ useJUnitPlatform {
+ includeTags("testcontainers")
+ }
+ systemProperty 'testcontainers.localstack.image', project.property('testcontainers.localstack.image')
+ systemProperty 'testcontainers.minio.image', project.property('testcontainers.minio.image')
+}
+
diff --git a/extensions/s3/gradle.properties b/extensions/s3/gradle.properties
index c186bbfdde1..e170c13d14b 100644
--- a/extensions/s3/gradle.properties
+++ b/extensions/s3/gradle.properties
@@ -1 +1,5 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC
+
+# TODO(deephaven-core#5115): EPIC: Dependency management
+testcontainers.localstack.image=localstack/localstack:3.1.0
+testcontainers.minio.image=minio/minio:RELEASE.2024-02-04T22-36-13Z
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsCredentials.java
deleted file mode 100644
index 5318ec540af..00000000000
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsCredentials.java
+++ /dev/null
@@ -1,15 +0,0 @@
-/**
- * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
- */
-package io.deephaven.extensions.s3;
-
-public interface AwsCredentials {
-
- static AwsCredentials defaultCredentials() {
- return DefaultCredentials.DEFAULT_CREDENTIALS;
- }
-
- static AwsCredentials basicCredentials(final String awsAccessKeyId, final String awsSecretAccessKey) {
- return BasicCredentials.of(awsAccessKeyId, awsSecretAccessKey);
- }
-}
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsSdkV2Credentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsSdkV2Credentials.java
index 04f58eff1df..60b5cbfbde6 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsSdkV2Credentials.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/AwsSdkV2Credentials.java
@@ -5,7 +5,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-interface AwsSdkV2Credentials extends AwsCredentials {
+interface AwsSdkV2Credentials extends Credentials {
- AwsCredentialsProvider awsCredentialsProvider();
+ AwsCredentialsProvider awsV2CredentialsProvider();
}
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/BasicCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/BasicCredentials.java
index 3459ef9ea6b..c7acd7dad99 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/BasicCredentials.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/BasicCredentials.java
@@ -11,25 +11,25 @@
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
/**
- * AWS credentials provider that uses access key and secret key provided at construction.
+ * Basic credentials that uses access key id and secret access key provided at construction.
*/
@Immutable
@SimpleStyle
abstract class BasicCredentials implements AwsSdkV2Credentials {
- static BasicCredentials of(final String awsAccessKeyId, final String awsSecretAccessKey) {
- return ImmutableBasicCredentials.of(awsAccessKeyId, awsSecretAccessKey);
+ static BasicCredentials of(final String accessKeyId, final String secretAccessKey) {
+ return ImmutableBasicCredentials.of(accessKeyId, secretAccessKey);
}
@Value.Parameter
- abstract String awsAccessKeyId();
+ abstract String accessKeyId();
@Value.Redacted
@Value.Parameter
- abstract String awsSecretAccessKey();
+ abstract String secretAccessKey();
- public AwsCredentialsProvider awsCredentialsProvider() {
- final AwsBasicCredentials awsCreds = AwsBasicCredentials.create(awsAccessKeyId(), awsSecretAccessKey());
- return StaticCredentialsProvider.create(awsCreds);
+ @Override
+ public final AwsCredentialsProvider awsV2CredentialsProvider() {
+ return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId(), secretAccessKey()));
}
}
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java
new file mode 100644
index 00000000000..12cadeaaaaf
--- /dev/null
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.extensions.s3;
+
+public interface Credentials {
+
+ /**
+ * The default credentials.
+ *
+ * @see Default
+ * credentials provider chain
+ */
+ static Credentials defaultCredentials() {
+ return DefaultCredentials.DEFAULT_CREDENTIALS;
+ }
+
+ /**
+ * Basic credentials with the specified access key id and secret access key.
+ *
+ * @param accessKeyId the access key id, used to identify the user
+ * @param secretAccessKey the secret access key, used to authenticate the user
+ */
+ static Credentials basicCredentials(final String accessKeyId, final String secretAccessKey) {
+ return BasicCredentials.of(accessKeyId, secretAccessKey);
+ }
+}
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java
index 5f69f50653f..200c34a570a 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java
@@ -14,7 +14,7 @@ enum DefaultCredentials implements AwsSdkV2Credentials {
DEFAULT_CREDENTIALS;
@Override
- public AwsCredentialsProvider awsCredentialsProvider() {
+ public final AwsCredentialsProvider awsV2CredentialsProvider() {
return DefaultCredentialsProvider.create();
}
}
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java
index 1c2239045d6..a36263ba80d 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java
@@ -5,15 +5,20 @@
import io.deephaven.annotations.BuildableStyle;
import io.deephaven.configuration.Configuration;
-import org.immutables.value.Value;
-import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import org.immutables.value.Value.Check;
+import org.immutables.value.Value.Default;
+import org.immutables.value.Value.Immutable;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import java.net.URI;
import java.time.Duration;
+import java.util.Optional;
/**
- * This class provides instructions intended for reading and writing data to AWS S3 instances.
+ * This class provides instructions intended for reading from and writing to S3-compatible APIs. The default values
+ * documented in this class may change in the future. As such, callers may wish to explicitly set the values.
*/
-@Value.Immutable
+@Immutable
@BuildableStyle
public abstract class S3Instructions {
@@ -22,10 +27,10 @@ public abstract class S3Instructions {
private final static String MAX_FRAGMENT_SIZE_CONFIG_PARAM = "S3.maxFragmentSize";
final static int MAX_FRAGMENT_SIZE =
- Configuration.getInstance().getIntegerWithDefault(MAX_FRAGMENT_SIZE_CONFIG_PARAM, 5 << 20); // 5 MB
+ Configuration.getInstance().getIntegerWithDefault(MAX_FRAGMENT_SIZE_CONFIG_PARAM, 5 << 20); // 5 MiB
private final static int DEFAULT_FRAGMENT_SIZE = MAX_FRAGMENT_SIZE;
- private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KB
+ private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KiB
private final static int DEFAULT_MAX_CACHE_SIZE = 32;
private final static Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2);
private final static Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2);
@@ -35,14 +40,14 @@ public static Builder builder() {
}
/**
- * The AWS region name to use when reading or writing to S3.
+ * The region name to use when reading or writing to S3.
*/
- public abstract String awsRegionName();
+ public abstract String regionName();
/**
* The maximum number of concurrent requests to make to S3, defaults to {@value #DEFAULT_MAX_CONCURRENT_REQUESTS}.
*/
- @Value.Default
+ @Default
public int maxConcurrentRequests() {
return DEFAULT_MAX_CONCURRENT_REQUESTS;
}
@@ -52,35 +57,38 @@ public int maxConcurrentRequests() {
* {@value #DEFAULT_READ_AHEAD_COUNT}, which means by default, we will fetch {@value #DEFAULT_READ_AHEAD_COUNT}
* fragments in advance when reading current fragment.
*/
- @Value.Default
+ @Default
public int readAheadCount() {
return DEFAULT_READ_AHEAD_COUNT;
}
/**
- * The maximum size of each fragment to read from S3, defaults to the value of config parameter
- * {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}. If there are fewer bytes remaining in the file, the fetched fragment can
- * be smaller.
+ * The maximum byte size of each fragment to read from S3, defaults to the value of config parameter
+ * {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}, or 5 MiB if unset. Must be between 8 KiB and the value of config
+ * parameter {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}. If there are fewer bytes remaining in the file, the fetched
+ * fragment can be smaller.
*/
- @Value.Default
+ @Default
public int fragmentSize() {
return DEFAULT_FRAGMENT_SIZE;
}
/**
- * The maximum number of fragments to cache in memory, defaults to {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is
- * done at the deephaven layer for faster access to recently read fragments.
+ * The maximum number of fragments to cache in memory, defaults to
+ * {@code Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE)}, which is at least
+ * {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is done at the deephaven layer for faster access to recently read
+ * fragments. Must be greater than or equal to {@code 1 + readAheadCount()}.
*/
- @Value.Default
+ @Default
public int maxCacheSize() {
- return DEFAULT_MAX_CACHE_SIZE;
+ return Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE);
}
/**
* The amount of time to wait when initially establishing a connection before giving up and timing out, defaults to
* 2 seconds.
*/
- @Value.Default
+ @Default
public Duration connectionTimeout() {
return DEFAULT_CONNECTION_TIMEOUT;
}
@@ -88,34 +96,68 @@ public Duration connectionTimeout() {
/**
* The amount of time to wait when reading a fragment before giving up and timing out, defaults to 2 seconds
*/
- @Value.Default
+ @Default
public Duration readTimeout() {
return DEFAULT_READ_TIMEOUT;
}
/**
- * The credentials to use when reading or writing to S3. By default, uses {@link DefaultCredentialsProvider}.
+ * The credentials to use when reading or writing to S3. By default, uses {@link Credentials#defaultCredentials()}.
*/
- @Value.Default
- public AwsCredentials credentials() {
- return AwsCredentials.defaultCredentials();
+ @Default
+ public Credentials credentials() {
+ return Credentials.defaultCredentials();
}
- @Value.Check
+ /**
+ * The endpoint to connect to. Callers connecting to AWS do not typically need to set this; it is most useful when
+ * connecting to non-AWS, S3-compatible APIs.
+ *
+ * @see Amazon Simple Storage Service endpoints
+ */
+ public abstract Optional endpointOverride();
+
+ public interface Builder {
+ Builder regionName(String regionName);
+
+ Builder maxConcurrentRequests(int maxConcurrentRequests);
+
+ Builder readAheadCount(int readAheadCount);
+
+ Builder fragmentSize(int fragmentSize);
+
+ Builder maxCacheSize(int maxCacheSize);
+
+ Builder connectionTimeout(Duration connectionTimeout);
+
+ Builder readTimeout(Duration connectionTimeout);
+
+ Builder credentials(Credentials credentials);
+
+ Builder endpointOverride(URI endpointOverride);
+
+ default Builder endpointOverride(String endpointOverride) {
+ return endpointOverride(URI.create(endpointOverride));
+ }
+
+ S3Instructions build();
+ }
+
+ @Check
final void boundsCheckMaxConcurrentRequests() {
if (maxConcurrentRequests() < 1) {
throw new IllegalArgumentException("maxConcurrentRequests(=" + maxConcurrentRequests() + ") must be >= 1");
}
}
- @Value.Check
+ @Check
final void boundsCheckReadAheadCount() {
if (readAheadCount() < 0) {
throw new IllegalArgumentException("readAheadCount(=" + readAheadCount() + ") must be >= 0");
}
}
- @Value.Check
+ @Check
final void boundsCheckMaxFragmentSize() {
if (fragmentSize() < MIN_FRAGMENT_SIZE) {
throw new IllegalArgumentException("fragmentSize(=" + fragmentSize() + ") must be >= " + MIN_FRAGMENT_SIZE +
@@ -127,7 +169,7 @@ final void boundsCheckMaxFragmentSize() {
}
}
- @Value.Check
+ @Check
final void boundsCheckMaxCacheSize() {
if (maxCacheSize() < readAheadCount() + 1) {
throw new IllegalArgumentException("maxCacheSize(=" + maxCacheSize() + ") must be >= 1 + " +
@@ -135,23 +177,25 @@ final void boundsCheckMaxCacheSize() {
}
}
- public interface Builder {
- Builder awsRegionName(String awsRegionName);
-
- Builder maxConcurrentRequests(int maxConcurrentRequests);
-
- Builder readAheadCount(int readAheadCount);
-
- Builder fragmentSize(int fragmentSize);
-
- Builder maxCacheSize(int maxCacheSize);
-
- Builder connectionTimeout(Duration connectionTimeout);
-
- Builder readTimeout(Duration connectionTimeout);
-
- Builder credentials(AwsCredentials credentials);
+ @Check
+ final void awsSdkV2Credentials() {
+ if (!(credentials() instanceof AwsSdkV2Credentials)) {
+ throw new IllegalArgumentException(
+ "credentials() must be created via provided io.deephaven.extensions.s3.Credentials methods");
+ }
+ }
- S3Instructions build();
+ final AwsCredentialsProvider awsV2CredentialsProvider() {
+ return ((AwsSdkV2Credentials) credentials()).awsV2CredentialsProvider();
}
+
+ // If necessary, we _could_ plumb support for "S3-compatible" services which don't support virtual-host style
+ // requests via software.amazon.awssdk.services.s3.S3BaseClientBuilder.forcePathStyle. Originally, AWS planned to
+ // deprecate path-style requests, but that has been delayed an indefinite amount of time. In the meantime, we'll
+ // keep S3Instructions simpler.
+ // https://aws.amazon.com/blogs/storage/update-to-amazon-s3-path-deprecation-plan/
+ // @Default
+ // public boolean forcePathStyle() {
+ // return false;
+ // }
}
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java
index a06d4d44b24..308f44f1672 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableByteChannel.java
@@ -34,7 +34,7 @@
/**
- * {@link SeekableByteChannel} class used to fetch objects from AWS S3 buckets using an async client with the ability to
+ * {@link SeekableByteChannel} class used to fetch objects from S3 buckets using an async client with the ability to
* read ahead and cache fragments of the object.
*/
final class S3SeekableByteChannel implements SeekableByteChannel, CachedChannelProvider.ContextHolder {
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
index 5f146619e18..ed23cebbc93 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
@@ -3,14 +3,14 @@
*/
package io.deephaven.extensions.s3;
-import io.deephaven.base.verify.Assert;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.jetbrains.annotations.NotNull;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
-import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
@@ -19,7 +19,7 @@
import static io.deephaven.extensions.s3.S3Instructions.MAX_FRAGMENT_SIZE;
/**
- * {@link SeekableChannelsProvider} implementation that is used to fetch objects from AWS S3 instances.
+ * {@link SeekableChannelsProvider} implementation that is used to fetch objects from S3 instances.
*/
final class S3SeekableChannelProvider implements SeekableChannelsProvider {
@@ -40,12 +40,12 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider {
.build();
// TODO(deephaven-core#5062): Add support for async client recovery and auto-close
// TODO(deephaven-core#5063): Add support for caching clients for re-use
- Assert.instanceOf(s3Instructions.credentials(), "credentials", AwsSdkV2Credentials.class);
- this.s3AsyncClient = S3AsyncClient.builder()
- .region(Region.of(s3Instructions.awsRegionName()))
+ final S3AsyncClientBuilder builder = S3AsyncClient.builder()
+ .region(Region.of(s3Instructions.regionName()))
.httpClient(asyncHttpClient)
- .credentialsProvider(((AwsSdkV2Credentials) s3Instructions.credentials()).awsCredentialsProvider())
- .build();
+ .credentialsProvider(s3Instructions.awsV2CredentialsProvider());
+ s3Instructions.endpointOverride().ifPresent(builder::endpointOverride);
+ this.s3AsyncClient = builder.build();
this.s3Instructions = s3Instructions;
}
diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java
new file mode 100644
index 00000000000..14704234848
--- /dev/null
+++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3InstructionsTest.java
@@ -0,0 +1,170 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.extensions.s3;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+public class S3InstructionsTest {
+
+ @Test
+ void defaults() {
+ final S3Instructions instructions = S3Instructions.builder().regionName("some-region").build();
+ assertThat(instructions.regionName()).isEqualTo("some-region");
+ assertThat(instructions.maxConcurrentRequests()).isEqualTo(50);
+ assertThat(instructions.readAheadCount()).isEqualTo(1);
+ assertThat(instructions.fragmentSize()).isEqualTo(5 * (1 << 20));
+ assertThat(instructions.maxCacheSize()).isEqualTo(32);
+ assertThat(instructions.connectionTimeout()).isEqualTo(Duration.ofSeconds(2));
+ assertThat(instructions.readTimeout()).isEqualTo(Duration.ofSeconds(2));
+ assertThat(instructions.credentials()).isEqualTo(Credentials.defaultCredentials());
+ assertThat(instructions.endpointOverride()).isEmpty();
+ }
+
+ @Test
+ void missingRegion() {
+ try {
+ S3Instructions.builder().build();
+ } catch (IllegalStateException e) {
+ assertThat(e).hasMessageContaining("regionName");
+ }
+ }
+
+ @Test
+ void minMaxConcurrentRequests() {
+ assertThat(S3Instructions.builder()
+ .regionName("some-region")
+ .maxConcurrentRequests(1)
+ .build()
+ .maxConcurrentRequests())
+ .isEqualTo(1);
+ }
+
+ @Test
+ void tooSmallMaxConcurrentRequests() {
+ try {
+ S3Instructions.builder()
+ .regionName("some-region")
+ .maxConcurrentRequests(0)
+ .build();
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessageContaining("maxConcurrentRequests");
+ }
+ }
+
+ @Test
+ void minReadAheadCount() {
+ assertThat(S3Instructions.builder()
+ .regionName("some-region")
+ .readAheadCount(0)
+ .build()
+ .readAheadCount())
+ .isZero();
+ }
+
+ @Test
+ void tooSmallReadAheadCount() {
+ try {
+ S3Instructions.builder()
+ .regionName("some-region")
+ .readAheadCount(-1)
+ .build();
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessageContaining("readAheadCount");
+ }
+ }
+
+ @Test
+ void minFragmentSize() {
+ assertThat(S3Instructions.builder()
+ .regionName("some-region")
+ .fragmentSize(8 * (1 << 10))
+ .build()
+ .fragmentSize())
+ .isEqualTo(8 * (1 << 10));
+ }
+
+ @Test
+ void tooSmallFragmentSize() {
+ try {
+ S3Instructions.builder()
+ .regionName("some-region")
+ .fragmentSize(8 * (1 << 10) - 1)
+ .build();
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessageContaining("fragmentSize");
+ }
+ }
+
+ @Test
+ void maxFragmentSize() {
+ assertThat(S3Instructions.builder()
+ .regionName("some-region")
+ .fragmentSize(S3Instructions.MAX_FRAGMENT_SIZE)
+ .build()
+ .fragmentSize())
+ .isEqualTo(S3Instructions.MAX_FRAGMENT_SIZE);
+ }
+
+ @Test
+ void tooBigFragmentSize() {
+ try {
+ S3Instructions.builder()
+ .regionName("some-region")
+ .fragmentSize(S3Instructions.MAX_FRAGMENT_SIZE + 1)
+ .build();
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessageContaining("fragmentSize");
+ }
+ }
+
+ @Test
+ void minMaxCacheSize() {
+ assertThat(S3Instructions.builder()
+ .regionName("some-region")
+ .readAheadCount(99)
+ .maxCacheSize(100)
+ .build()
+ .maxCacheSize())
+ .isEqualTo(100);
+ }
+
+ @Test
+ void tooSmallCacheSize() {
+ try {
+ S3Instructions.builder()
+ .regionName("some-region")
+ .readAheadCount(99)
+ .maxCacheSize(99)
+ .build();
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessageContaining("maxCacheSize");
+ }
+ }
+
+ @Test
+ void basicCredentials() {
+ assertThat(S3Instructions.builder()
+ .regionName("some-region")
+ .credentials(Credentials.basicCredentials("foo", "bar"))
+ .build()
+ .credentials())
+ .isEqualTo(Credentials.basicCredentials("foo", "bar"));
+ }
+
+ @Test
+ void badCredentials() {
+ try {
+ S3Instructions.builder()
+ .regionName("some-region")
+ .credentials(new Credentials() {})
+ .build();
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessageContaining("credentials");
+ }
+ }
+}
diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java
new file mode 100644
index 00000000000..5027516dce8
--- /dev/null
+++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.extensions.s3;
+
+
+import io.deephaven.extensions.s3.S3Instructions.Builder;
+import io.deephaven.extensions.s3.SingletonContainers.LocalStack;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import software.amazon.awssdk.services.s3.S3Client;
+
+@Tag("testcontainers")
+public class S3SeekableChannelLocalStackTest extends S3SeekableChannelTestBase {
+
+ @BeforeAll
+ static void initContainer() {
+ // ensure container is started so container startup time isn't associated with a specific test
+ LocalStack.init();
+ }
+
+ @Override
+ public Builder s3Instructions(Builder builder) {
+ return LocalStack.s3Instructions(builder);
+ }
+
+ @Override
+ public S3Client s3Client() {
+ return LocalStack.s3Client();
+ }
+}
diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java
new file mode 100644
index 00000000000..94266f4af1c
--- /dev/null
+++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.extensions.s3;
+
+
+import io.deephaven.extensions.s3.S3Instructions.Builder;
+import io.deephaven.extensions.s3.SingletonContainers.MinIO;
+import io.deephaven.stats.util.OSUtil;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import software.amazon.awssdk.services.s3.S3Client;
+
+@Tag("testcontainers")
+public class S3SeekableChannelMinIOTest extends S3SeekableChannelTestBase {
+
+ @BeforeAll
+ static void initContainer() {
+ // TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
+ Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
+ // ensure container is started so container startup time isn't associated with a specific test
+ MinIO.init();
+ }
+
+ @Override
+ public Builder s3Instructions(Builder builder) {
+ return MinIO.s3Instructions(builder);
+ }
+
+ @Override
+ public S3Client s3Client() {
+ return MinIO.s3Client();
+ }
+}
diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java
new file mode 100644
index 00000000000..10692cb122e
--- /dev/null
+++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java
@@ -0,0 +1,113 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.extensions.s3;
+
+
+import io.deephaven.util.channel.CachedChannelProvider;
+import io.deephaven.util.channel.SeekableChannelContext;
+import io.deephaven.util.channel.SeekableChannelsProvider;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public abstract class S3SeekableChannelTestBase {
+
+ public abstract S3Client s3Client();
+
+ public abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder);
+
+ private S3Client client;
+
+ private String bucket;
+
+ private final List keys = new ArrayList<>();
+
+ @BeforeEach
+ void setUp() {
+ bucket = UUID.randomUUID().toString();
+ client = s3Client();
+ client.createBucket(CreateBucketRequest.builder().bucket(bucket).build());
+ }
+
+ @AfterEach
+ void tearDown() {
+ for (String key : keys) {
+ client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build());
+ }
+ keys.clear();
+ client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build());
+ client.close();
+ }
+
+ @Test
+ void readEmptyFile() throws IOException {
+ putObject("empty.txt", RequestBody.empty());
+ final URI uri = uri("empty.txt");
+ final ByteBuffer buffer = ByteBuffer.allocate(1);
+ try (
+ final SeekableChannelsProvider providerImpl = providerImpl(uri);
+ final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32);
+ final SeekableChannelContext context = provider.makeContext();
+ final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) {
+ assertThat(readChannel.read(buffer)).isEqualTo(-1);
+ }
+ }
+
+ @Test
+ void read32MiB() throws IOException {
+ final int numBytes = 33554432;
+ putObject("32MiB.bin", RequestBody.fromInputStream(new InputStream() {
+ @Override
+ public int read() {
+ return 42;
+ }
+ }, numBytes));
+ final URI uri = uri("32MiB.bin");
+ final ByteBuffer buffer = ByteBuffer.allocate(1);
+ try (
+ final SeekableChannelsProvider providerImpl = providerImpl(uri);
+ final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32);
+ final SeekableChannelContext context = provider.makeContext();
+ final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) {
+ for (long p = 0; p < numBytes; ++p) {
+ assertThat(readChannel.read(buffer)).isEqualTo(1);
+ assertThat(buffer.get(0)).isEqualTo((byte) 42);
+ buffer.clear();
+ }
+ assertThat(readChannel.read(buffer)).isEqualTo(-1);
+ }
+ }
+
+ private URI uri(String key) {
+ return URI.create(String.format("s3://%s/%s", bucket, key));
+ }
+
+ private void putObject(String key, RequestBody body) {
+ client.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body);
+ keys.add(key);
+ }
+
+ private SeekableChannelsProvider providerImpl(URI uri) {
+ final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin();
+ final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build();
+ return plugin.createProvider(uri, instructions);
+ }
+}
diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java
new file mode 100644
index 00000000000..b3b0241923d
--- /dev/null
+++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java
@@ -0,0 +1,89 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.extensions.s3;
+
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.MinIOContainer;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.containers.localstack.LocalStackContainer.Service;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+
+import java.net.URI;
+
+final class SingletonContainers {
+
+ // This pattern allows the respective images to be spun up as a container once per-JVM as opposed to once per-class
+ // or once per-test.
+ // https://java.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers
+ // https://testcontainers.com/guides/testcontainers-container-lifecycle/#_using_singleton_containers
+
+ static final class LocalStack {
+ private static final LocalStackContainer LOCALSTACK_S3 =
+ new LocalStackContainer(DockerImageName.parse(System.getProperty("testcontainers.localstack.image")))
+ .withServices(Service.S3);
+ static {
+ LOCALSTACK_S3.start();
+ }
+
+ static void init() {
+ // no-op, ensures this class is initialized
+ }
+
+ static S3Instructions.Builder s3Instructions(S3Instructions.Builder builder) {
+ return builder
+ .endpointOverride(LOCALSTACK_S3.getEndpoint())
+ .regionName(LOCALSTACK_S3.getRegion())
+ .credentials(
+ Credentials.basicCredentials(LOCALSTACK_S3.getAccessKey(),
+ LOCALSTACK_S3.getSecretKey()));
+ }
+
+ static S3Client s3Client() {
+ return S3Client
+ .builder()
+ .endpointOverride(LOCALSTACK_S3.getEndpoint())
+ .region(Region.of(LOCALSTACK_S3.getRegion()))
+ .credentialsProvider(StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(LOCALSTACK_S3.getAccessKey(), LOCALSTACK_S3.getSecretKey())))
+ .build();
+ }
+ }
+
+ static final class MinIO {
+ // MINIO_DOMAIN is set so MinIO will accept virtual-host style requests; see virtual-host style implementation
+ // comments in S3Instructions.
+ // https://min.io/docs/minio/linux/reference/minio-server/settings/core.html#domain
+ private static final MinIOContainer MINIO =
+ new MinIOContainer(DockerImageName.parse(System.getProperty("testcontainers.minio.image")))
+ .withEnv("MINIO_DOMAIN", DockerClientFactory.instance().dockerHostIpAddress());
+ static {
+ MINIO.start();
+ }
+
+ static void init() {
+ // no-op, ensures this class is initialized
+ }
+
+ static S3Instructions.Builder s3Instructions(S3Instructions.Builder builder) {
+ return builder
+ .endpointOverride(URI.create(MINIO.getS3URL()))
+ .regionName(Region.AWS_GLOBAL.id())
+ .credentials(Credentials.basicCredentials(MINIO.getUserName(), MINIO.getPassword()));
+ }
+
+ static S3Client s3Client() {
+ return S3Client
+ .builder()
+ .endpointOverride(URI.create(MINIO.getS3URL()))
+ .region(Region.AWS_GLOBAL)
+ .credentialsProvider(StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(MINIO.getUserName(), MINIO.getPassword())))
+ .build();
+ }
+ }
+}
diff --git a/py/server/deephaven/experimental/s3.py b/py/server/deephaven/experimental/s3.py
index ad186285ae5..62a3f9db6bd 100644
--- a/py/server/deephaven/experimental/s3.py
+++ b/py/server/deephaven/experimental/s3.py
@@ -14,27 +14,27 @@
# If we move S3 to a permanent module, we should remove this try/except block and just import the types directly.
try:
- _JAwsCredentials = jpy.get_type("io.deephaven.extensions.s3.AwsCredentials")
+ _JCredentials = jpy.get_type("io.deephaven.extensions.s3.Credentials")
_JS3Instructions = jpy.get_type("io.deephaven.extensions.s3.S3Instructions")
except Exception:
- _JAwsCredentials = None
+ _JCredentials = None
_JS3Instructions = None
"""
- This module is useful for reading files stored in S3.
+ This module is useful for reading files stored in S3-compatible APIs.
Importing this module requires the S3 specific deephaven extensions (artifact name deephaven-extensions-s3) to be
included in the package. This is an opt-out functionality included by default. If not included, importing this
module will fail to find the java types.
"""
class S3Instructions(JObjectWrapper):
"""
- S3Instructions provides specialized instructions for reading from AWS S3.
+ S3Instructions provides specialized instructions for reading from S3-compatible APIs.
"""
j_object_type = _JS3Instructions or type(None)
def __init__(self,
- aws_region_name: str,
+ region_name: str,
max_concurrent_requests: Optional[int] = None,
read_ahead_count: Optional[int] = None,
fragment_size: Optional[int] = None,
@@ -43,22 +43,22 @@ def __init__(self,
Duration, int, str, datetime.timedelta, np.timedelta64, pd.Timedelta, None] = None,
read_timeout: Union[
Duration, int, str, datetime.timedelta, np.timedelta64, pd.Timedelta, None] = None,
- aws_access_key_id: Optional[str] = None,
- aws_secret_access_key: Optional[str] = None):
+ access_key_id: Optional[str] = None,
+ secret_access_key: Optional[str] = None,
+ endpoint_override: Optional[str] = None):
"""
Initializes the instructions.
Args:
- aws_region_name (str): the AWS region name for reading parquet files stored in AWS S3, mandatory parameter.
- max_concurrent_requests (int): the maximum number of concurrent requests for reading parquet files stored in S3.
- default is 50.
+ region_name (str): the region name for reading parquet files, mandatory parameter.
+ max_concurrent_requests (int): the maximum number of concurrent requests for reading files, default is 50.
read_ahead_count (int): the number of fragments to send asynchronous read requests for while reading the current
fragment. Default to 1, which means fetch the next fragment in advance when reading the current fragment.
- fragment_size (int): the maximum size of each fragment to read from S3, defaults to 5 MB. If there are fewer
- bytes remaining in the file, the fetched fragment can be smaller.
+ fragment_size (int): the maximum size of each fragment to read, defaults to 5 MB. If there are fewer bytes
+ remaining in the file, the fetched fragment can be smaller.
max_cache_size (int): the maximum number of fragments to cache in memory while reading, defaults to 32. This
- caching is done at the deephaven layer for faster access to recently read fragments.
+ caching is done at the Deephaven layer for faster access to recently read fragments.
connection_timeout (Union[Duration, int, str, datetime.timedelta, np.timedelta64, pd.Timedelta]):
the amount of time to wait when initially establishing a connection before giving up and timing out, can
be expressed as an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or
@@ -67,24 +67,24 @@ def __init__(self,
the amount of time to wait when reading a fragment before giving up and timing out, can be expressed as
an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or other time
duration types. Default to 2 seconds.
- aws_access_key_id (str): the AWS access key for reading parquet files stored in AWS S3. Both access key and
- secret key must be provided to use static credentials, else default credentials will be used from
- software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.
- aws_secret_access_key (str): the AWS secret access key for reading parquet files stored in AWS S3. Both
- access key and secret key must be provided to use static credentials, else default credentials will be
- used from software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.
+ access_key_id (str): the access key for reading files. Both access key and secret access key must be provided
+ to use static credentials, else default credentials will be used.
+ secret_access_key (str): the secret access key for reading files. Both access key and secret key must be
+ provided to use static credentials, else default credentials will be used.
+ endpoint_override (str): the endpoint to connect to. Callers connecting to AWS do not typically need to set
+ this; it is most useful when connecting to non-AWS, S3-compatible APIs.
Raises:
DHError: If unable to build the instructions object.
"""
- if not _JS3Instructions or not _JAwsCredentials:
+ if not _JS3Instructions or not _JCredentials:
raise DHError(message="S3Instructions requires the S3 specific deephaven extensions to be included in "
"the package")
try:
builder = self.j_object_type.builder()
- builder.awsRegionName(aws_region_name)
+ builder.regionName(region_name)
if max_concurrent_requests is not None:
builder.maxConcurrentRequests(max_concurrent_requests)
@@ -104,12 +104,15 @@ def __init__(self,
if read_timeout is not None:
builder.readTimeout(time.to_j_duration(read_timeout))
- if ((aws_access_key_id is not None and aws_secret_access_key is None) or
- (aws_access_key_id is None and aws_secret_access_key is not None)):
- raise DHError("Either both aws_access_key_id and aws_secret_access_key must be provided or neither")
+ if ((access_key_id is not None and secret_access_key is None) or
+ (access_key_id is None and secret_access_key is not None)):
+ raise DHError("Either both access_key_id and secret_access_key must be provided or neither")
- if aws_access_key_id is not None:
- builder.credentials(_JAwsCredentials.basicCredentials(aws_access_key_id, aws_secret_access_key))
+ if access_key_id is not None:
+ builder.credentials(_JCredentials.basicCredentials(access_key_id, secret_access_key))
+
+ if endpoint_override is not None:
+ builder.endpointOverride(endpoint_override)
self._j_object = builder.build()
except Exception as e:
diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py
index 688f0e237a9..496d485b790 100644
--- a/py/server/tests/test_parquet.py
+++ b/py/server/tests/test_parquet.py
@@ -558,17 +558,17 @@ def test_read_parquet_from_s3(self):
# Fails since we have a negative read_ahead_count
with self.assertRaises(DHError):
- s3.S3Instructions(aws_region_name="us-east-1",
+ s3.S3Instructions(region_name="us-east-1",
read_ahead_count=-1,
)
# Fails since we provide the key without the secret key
with self.assertRaises(DHError):
- s3.S3Instructions(aws_region_name="us-east-1",
- aws_access_key_id="Some key without secret",
+ s3.S3Instructions(region_name="us-east-1",
+ access_key_id="Some key without secret",
)
- s3_instructions = s3.S3Instructions(aws_region_name="us-east-1",
+ s3_instructions = s3.S3Instructions(region_name="us-east-1",
read_ahead_count=1,
)
# Fails because we don't have the right credentials