From 84420bfaf809a5636632e1517479759ae7266f9c Mon Sep 17 00:00:00 2001 From: Lorenz Thiede Date: Thu, 1 Feb 2024 10:49:53 +0100 Subject: [PATCH] SNOW-983635 Allow ZSTD compression algorithm (#654) --- README.md | 1 + pom.xml | 13 +++++++------ public_pom.xml | 6 ++++++ .../net/snowflake/ingest/utils/Constants.java | 3 ++- .../internal/ParameterProviderTest.java | 13 ++++++++++++- .../internal/StreamingIngestBigFilesIT.java | 15 +++++++++++++++ .../streaming/internal/StreamingIngestIT.java | 16 +++++++++++++++- .../internal/datatypes/AbstractDataTypeTest.java | 15 +++++++++++++++ 8 files changed, 73 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 5027f3216..fcf935d51 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ The Snowflake Ingest Service SDK depends on the following libraries: * snowflake-jdbc (3.13.30 to 3.13.33) * slf4j-api +* com.github.luben:zstd-jni (1.5.0-1) These dependencies will be fetched automatically by build systems like Maven or Gradle. If you don't build your project using a build system, please make sure these dependencies are on the classpath. diff --git a/pom.xml b/pom.xml index 23e6243f4..47a1663d9 100644 --- a/pom.xml +++ b/pom.xml @@ -356,7 +356,6 @@ - com.fasterxml.jackson.core jackson-annotations @@ -389,7 +388,6 @@ - com.google.code.findbugs jsr305 @@ -473,6 +471,12 @@ org.slf4j slf4j-api + + com.github.luben + zstd-jni + 1.5.0-1 + runtime + com.google.protobuf protobuf-java @@ -954,6 +958,7 @@ net.snowflake:snowflake-jdbc org.slf4j:slf4j-api + com.github.luben:zstd-jni @@ -1034,10 +1039,6 @@ com.ctc ${shadeBase}.com.ctc - - com.github.luben - ${shadeBase}.com.github.luben - com.thoughtworks ${shadeBase}.com.thoughtworks diff --git a/public_pom.xml b/public_pom.xml index 9ef308a08..fb03684cf 100644 --- a/public_pom.xml +++ b/public_pom.xml @@ -48,5 +48,11 @@ 1.7.36 compile + + com.github.luben + zstd-jni + 1.5.0-1 + runtime + diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index a22bc9f21..404ec3851 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -119,7 +119,8 @@ public static BdecVersion fromInt(int val) { * CompressionCodecName, but we want to control and allow only specific values of that. */ public enum BdecParquetCompression { - GZIP; + GZIP, + ZSTD; public CompressionCodecName getCompressionCodec() { return CompressionCodecName.fromConf(this.name()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 1ee6b9542..a32b3a25d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -308,6 +308,17 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); }); + List zstdValues = Arrays.asList("ZSTD", "zstd", "Zstd", "zStd"); + zstdValues.forEach( + v -> { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals( + Constants.BdecParquetCompression.ZSTD, + parameterProvider.getBdecParquetCompressionAlgorithm()); + }); } @Test @@ -322,7 +333,7 @@ public void testInvalidCompressionAlgorithm() { } catch (IllegalArgumentException e) { Assert.assertEquals( "Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = 'invalid_comp', allowed values are" - + " [GZIP]", + + " [GZIP, ZSTD]", e.getMessage()); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java index 7994ff7bf..62494037c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestBigFilesIT.java @@ -2,6 +2,7 @@ import static net.snowflake.ingest.TestUtils.verifyTableRowCount; import static net.snowflake.ingest.utils.Constants.ROLE; +import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; import java.sql.Connection; import java.sql.ResultSet; @@ -17,8 +18,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; /** Ingest large amount of rows. */ +@RunWith(Parameterized.class) public class StreamingIngestBigFilesIT { private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB"; private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA"; @@ -29,6 +35,14 @@ public class StreamingIngestBigFilesIT { private Connection jdbcConnection; private String testDb; + @Parameters(name = "{index}: {0}") + public static Object[] compressionAlgorithms() { + return new Object[] {"GZIP", "ZSTD"}; + } + + @Parameter + public String compressionAlgorithm; + @Before public void beforeAll() throws Exception { testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4); @@ -51,6 +65,7 @@ public void beforeAll() throws Exception { if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) { prop.setProperty(ROLE, "ACCOUNTADMIN"); } + prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm); client = (SnowflakeStreamingIngestClientInternal) SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index 3609b8e5e..38769e0b1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -3,6 +3,7 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.COMPRESS_BLOB_TWICE; import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT; +import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -45,10 +46,15 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; /** Example streaming ingest sdk integration test */ +@RunWith(Parameterized.class) public class StreamingIngestIT { private static final String TEST_TABLE = "STREAMING_INGEST_TEST_TABLE"; private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB"; @@ -64,6 +70,14 @@ public class StreamingIngestIT { private Connection jdbcConnection; private String testDb; + @Parameters(name = "{index}: {0}") + public static Object[] compressionAlgorithms() { + return new Object[] {"GZIP", "ZSTD"}; + } + + @Parameter + public String compressionAlgorithm; + @Before public void beforeAll() throws Exception { testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4); @@ -90,7 +104,7 @@ public void beforeAll() throws Exception { // Test without role param prop = TestUtils.getProperties(Constants.BdecVersion.THREE, true); - + prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm); client = (SnowflakeStreamingIngestClientInternal) SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index f449be2d5..b78473a11 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -22,11 +22,17 @@ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; import net.snowflake.ingest.utils.Constants; +import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; import net.snowflake.ingest.utils.SFException; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) public abstract class AbstractDataTypeTest { private static final String SOURCE_COLUMN_NAME = "source"; private static final String VALUE_COLUMN_NAME = "value"; @@ -56,6 +62,14 @@ public abstract class AbstractDataTypeTest { private SnowflakeStreamingIngestClient client; private static final ObjectMapper objectMapper = new ObjectMapper(); + @Parameters(name = "{index}: {0}") + public static Object[] compressionAlgorithms() { + return new Object[] {"GZIP", "ZSTD"}; + } + + @Parameter + public String compressionAlgorithm; + @Before public void before() throws Exception { databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier()); @@ -70,6 +84,7 @@ public void before() throws Exception { if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } + props.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm); client = SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(props).build(); }