diff --git a/README.md b/README.md
index 5027f3216..fcf935d51 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,7 @@ The Snowflake Ingest Service SDK depends on the following libraries:
* snowflake-jdbc (3.13.30 to 3.13.33)
* slf4j-api
+* com.github.luben:zstd-jni (1.5.0-1)
These dependencies will be fetched automatically by build systems like Maven or Gradle. If you don't build your project
using a build system, please make sure these dependencies are on the classpath.
diff --git a/pom.xml b/pom.xml
index 23e6243f4..47a1663d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -356,7 +356,6 @@
-
com.fasterxml.jackson.core
jackson-annotations
@@ -389,7 +388,6 @@
-
com.google.code.findbugs
jsr305
@@ -473,6 +471,12 @@
org.slf4j
slf4j-api
+
+ com.github.luben
+ zstd-jni
+ 1.5.0-1
+ runtime
+
com.google.protobuf
protobuf-java
@@ -954,6 +958,7 @@
net.snowflake:snowflake-jdbc
org.slf4j:slf4j-api
+ com.github.luben:zstd-jni
@@ -1034,10 +1039,6 @@
com.ctc
${shadeBase}.com.ctc
-
- com.github.luben
- ${shadeBase}.com.github.luben
-
com.thoughtworks
${shadeBase}.com.thoughtworks
diff --git a/public_pom.xml b/public_pom.xml
index 9ef308a08..fb03684cf 100644
--- a/public_pom.xml
+++ b/public_pom.xml
@@ -48,5 +48,11 @@
1.7.36
compile
+
+ com.github.luben
+ zstd-jni
+ 1.5.0-1
+ runtime
+
diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java
index a22bc9f21..404ec3851 100644
--- a/src/main/java/net/snowflake/ingest/utils/Constants.java
+++ b/src/main/java/net/snowflake/ingest/utils/Constants.java
@@ -119,7 +119,8 @@ public static BdecVersion fromInt(int val) {
* CompressionCodecName, but we want to control and allow only specific values of that.
*/
public enum BdecParquetCompression {
- GZIP;
+ GZIP,
+ ZSTD;
public CompressionCodecName getCompressionCodec() {
return CompressionCodecName.fromConf(this.name());
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java
index 1ee6b9542..a32b3a25d 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java
@@ -308,6 +308,17 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() {
Constants.BdecParquetCompression.GZIP,
parameterProvider.getBdecParquetCompressionAlgorithm());
});
+ List zstdValues = Arrays.asList("ZSTD", "zstd", "Zstd", "zStd");
+ zstdValues.forEach(
+ v -> {
+ Properties prop = new Properties();
+ Map parameterMap = getStartingParameterMap();
+ parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v);
+ ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
+ Assert.assertEquals(
+ Constants.BdecParquetCompression.ZSTD,
+ parameterProvider.getBdecParquetCompressionAlgorithm());
+ });
}
@Test
@@ -322,7 +333,7 @@ public void testInvalidCompressionAlgorithm() {
} catch (IllegalArgumentException e) {
Assert.assertEquals(
"Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = 'invalid_comp', allowed values are"
- + " [GZIP]",
+ + " [GZIP, ZSTD]",
e.getMessage());
}
}
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java
index 7994ff7bf..62494037c 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java
@@ -2,6 +2,7 @@
import static net.snowflake.ingest.TestUtils.verifyTableRowCount;
import static net.snowflake.ingest.utils.Constants.ROLE;
+import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -17,8 +18,13 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
/** Ingest large amount of rows. */
+@RunWith(Parameterized.class)
public class StreamingIngestBigFilesIT {
private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB";
private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA";
@@ -29,6 +35,14 @@ public class StreamingIngestBigFilesIT {
private Connection jdbcConnection;
private String testDb;
+ @Parameters(name = "{index}: {0}")
+ public static Object[] compressionAlgorithms() {
+ return new Object[] {"GZIP", "ZSTD"};
+ }
+
+ @Parameter
+ public String compressionAlgorithm;
+
@Before
public void beforeAll() throws Exception {
testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4);
@@ -51,6 +65,7 @@ public void beforeAll() throws Exception {
if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) {
prop.setProperty(ROLE, "ACCOUNTADMIN");
}
+ prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
client =
(SnowflakeStreamingIngestClientInternal>)
SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build();
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java
index 3609b8e5e..38769e0b1 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java
@@ -3,6 +3,7 @@
import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER;
import static net.snowflake.ingest.utils.Constants.COMPRESS_BLOB_TWICE;
import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT;
+import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -45,10 +46,15 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
/** Example streaming ingest sdk integration test */
+@RunWith(Parameterized.class)
public class StreamingIngestIT {
private static final String TEST_TABLE = "STREAMING_INGEST_TEST_TABLE";
private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB";
@@ -64,6 +70,14 @@ public class StreamingIngestIT {
private Connection jdbcConnection;
private String testDb;
+ @Parameters(name = "{index}: {0}")
+ public static Object[] compressionAlgorithms() {
+ return new Object[] {"GZIP", "ZSTD"};
+ }
+
+ @Parameter
+ public String compressionAlgorithm;
+
@Before
public void beforeAll() throws Exception {
testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4);
@@ -90,7 +104,7 @@ public void beforeAll() throws Exception {
// Test without role param
prop = TestUtils.getProperties(Constants.BdecVersion.THREE, true);
-
+ prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
client =
(SnowflakeStreamingIngestClientInternal>)
SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build();
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java
index f449be2d5..b78473a11 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java
@@ -22,11 +22,17 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.Constants;
+import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
import net.snowflake.ingest.utils.SFException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+@RunWith(Parameterized.class)
public abstract class AbstractDataTypeTest {
private static final String SOURCE_COLUMN_NAME = "source";
private static final String VALUE_COLUMN_NAME = "value";
@@ -56,6 +62,14 @@ public abstract class AbstractDataTypeTest {
private SnowflakeStreamingIngestClient client;
private static final ObjectMapper objectMapper = new ObjectMapper();
+ @Parameters(name = "{index}: {0}")
+ public static Object[] compressionAlgorithms() {
+ return new Object[] {"GZIP", "ZSTD"};
+ }
+
+ @Parameter
+ public String compressionAlgorithm;
+
@Before
public void before() throws Exception {
databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier());
@@ -70,6 +84,7 @@ public void before() throws Exception {
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
+ props.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
client = SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(props).build();
}