From 7fa0e3a4cab63394e0c8feb8429b8c9d0e866610 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 9 Oct 2024 22:23:18 +0000 Subject: [PATCH] Fix ITs --- .../streaming/internal/BlobBuilder.java | 8 +--- .../internal/ClientBufferParameters.java | 4 +- .../streaming/internal/FlushServiceTest.java | 8 +--- .../SnowflakeStreamingIngestClientTest.java | 16 ++++---- .../datatypes/AbstractDataTypeTest.java | 37 ++++++++++++++++++- .../internal/datatypes/BinaryIT.java | 2 +- .../internal/datatypes/DateTimeIT.java | 2 +- .../internal/datatypes/IcebergDateTimeIT.java | 20 +++++++++- .../datatypes/IcebergLogicalTypesIT.java | 19 +++++++++- .../datatypes/IcebergNumericTypesIT.java | 19 +++++++++- .../internal/datatypes/IcebergStringIT.java | 19 +++++++++- .../datatypes/IcebergStructuredIT.java | 19 +++++++++- .../internal/datatypes/LogicalTypesIT.java | 2 +- .../streaming/internal/datatypes/NullIT.java | 2 +- .../internal/datatypes/NumericTypesIT.java | 2 +- .../internal/datatypes/SemiStructuredIT.java | 3 +- .../internal/datatypes/StringsIT.java | 2 +- .../streaming/internal/it/ColumnNamesIT.java | 2 +- 18 files changed, 148 insertions(+), 38 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 487bc5cbe..3e1de452a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -69,12 +69,8 @@ static Blob constructBlobAndMetadata( List>> blobData, Constants.BdecVersion bdecVersion, InternalParameterProvider internalParameterProvider) - throws IOException, - NoSuchPaddingException, - NoSuchAlgorithmException, - InvalidAlgorithmParameterException, - InvalidKeyException, - IllegalBlockSizeException, + throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, + InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { List chunksMetadataList = new ArrayList<>(); List chunksDataList = new ArrayList<>(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 5699c56f5..9009642b3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -47,9 +47,7 @@ private ClientBufferParameters( this.isIcebergMode = isIcebergMode; } - /** - * @param clientInternal reference to the client object where the relevant parameters are set - */ + /** @param clientInternal reference to the client object where the relevant parameters are set */ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) { this.maxChunkSizeInBytes = clientInternal != null diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index bd2fc0043..9043d2ff5 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -1231,12 +1231,8 @@ public void testShutDown() throws Exception { @Test public void testEncryptionDecryption() - throws InvalidAlgorithmParameterException, - NoSuchPaddingException, - IllegalBlockSizeException, - NoSuchAlgorithmException, - BadPaddingException, - InvalidKeyException { + throws InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, + NoSuchAlgorithmException, BadPaddingException, InvalidKeyException { byte[] data = "testEncryptionDecryption".getBytes(StandardCharsets.UTF_8); String encryptionKey = Base64.getEncoder().encodeToString("encryption_key".getBytes(StandardCharsets.UTF_8)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index f4e835772..c468f03ea 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -133,7 +133,7 @@ public void setup() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -150,7 +150,7 @@ public void setup() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -167,7 +167,7 @@ public void setup() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); channel4 = new SnowflakeStreamingIngestChannelInternal<>( "channel4", @@ -184,7 +184,7 @@ public void setup() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); } @Test @@ -382,7 +382,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); @@ -445,7 +445,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); try { client.getChannelsStatus(Collections.singletonList(channel)); @@ -499,7 +499,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); ChannelMetadata channelMetadata = ChannelMetadata.builder() @@ -1266,7 +1266,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 3c58fb000..a1dfc6e2c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -66,19 +66,52 @@ public abstract class AbstractDataTypeTest { protected static final ObjectMapper objectMapper = new ObjectMapper(); @Parameters(name = "{index}: {0}") - public static Object[] compressionAlgorithms() { + public static Object[] parameters() { return new Object[] {"GZIP", "ZSTD"}; } @Parameter public String compressionAlgorithm; - public void before(boolean isIceberg) throws Exception { + public void before() throws Exception { + setUp( + false /* isIceberg */, + compressionAlgorithm, + Constants.IcebergSerializationPolicy.NON_ICEBERG); + } + + public void beforeIceberg( + String compressionAlgorithm, Constants.IcebergSerializationPolicy serializationPolicy) + throws Exception { + setUp(true /* isIceberg */, compressionAlgorithm, serializationPolicy); + } + + protected void setUp( + boolean isIceberg, + String compressionAlgorithm, + Constants.IcebergSerializationPolicy serializationPolicy) + throws Exception { databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier()); conn = TestUtils.getConnection(true); conn.createStatement().execute(String.format("create or replace database %s;", databaseName)); conn.createStatement().execute(String.format("use database %s;", databaseName)); conn.createStatement().execute(String.format("use schema %s;", schemaName)); + switch (serializationPolicy) { + case COMPATIBLE: + conn.createStatement() + .execute( + String.format( + "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'COMPATIBLE';", + schemaName)); + break; + case OPTIMIZED: + conn.createStatement() + .execute( + String.format( + "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED';", schemaName)); + break; + } + conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java index a379f27ad..9f3f69751 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java @@ -7,7 +7,7 @@ public class BinaryIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java index 7695a5a12..7eabb40e6 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java @@ -21,7 +21,7 @@ public class DateTimeIT extends AbstractDataTypeTest { @Before public void setup() throws Exception { - super.before(false); + super.before(); // Set to a random time zone not to interfere with any of the tests conn.createStatement().execute("alter session set timezone = 'America/New_York';"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java index 3fe53aed5..ddb8f278b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java @@ -10,18 +10,36 @@ import java.time.OffsetTime; import java.time.ZoneOffset; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergDateTimeIT extends AbstractDataTypeTest { + + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java index 33e062d72..985c486a6 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java @@ -1,18 +1,35 @@ package net.snowflake.ingest.streaming.internal.datatypes; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergLogicalTypesIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java index b8df92228..1734b1d71 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java @@ -2,18 +2,35 @@ import java.math.BigDecimal; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergNumericTypesIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java index 9ee7788a1..d30be6d0b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java @@ -2,6 +2,7 @@ import java.math.BigDecimal; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.lang3.StringUtils; @@ -9,12 +10,28 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergStringIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 6bcc34635..3ca84f4e4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -7,18 +7,35 @@ import java.util.UUID; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergStructuredIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java index 526d4e17d..721ce3f52 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java @@ -8,7 +8,7 @@ public class LogicalTypesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java index 23e933c14..5d1843aa1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java @@ -7,7 +7,7 @@ public class NullIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java index ebdb37015..e3d5300a0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java @@ -8,7 +8,7 @@ public class NumericTypesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java index b2eaa8d48..5905f92d9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java @@ -22,8 +22,9 @@ public class SemiStructuredIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } + // TODO SNOW-664249: There is a few-byte mismatch between the value sent by the user and its // server-side representation. Validation leaves a small buffer for this difference. private static final int MAX_ALLOWED_LENGTH = 16 * 1024 * 1024 - 64; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java index acd9b2283..e11b82005 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java @@ -19,7 +19,7 @@ public class StringsIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java index d8dbcc72f..fb253e5ea 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java @@ -23,7 +23,7 @@ public class ColumnNamesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test