From 6d6ba9455ca1cfbd62b6e7e5c4b78412d8dda243 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 14 Oct 2024 14:16:44 -0700 Subject: [PATCH] SNOW-1708577 Parquet V2 support for new table format (#851) This PR aims to ensure consistency with the Iceberg table scanner and registration on the server side. It includes the following changes: - Added support for Parquet V2 (delta encoding) when the schema's STORAGE_SERIALIZATION_POLICY is set to OPTIMIZED. - Enabled dictionary encoding for Iceberg mode. - Do writerVersion defaulting to channel construction callsite in clientInternal (from channel ctor), instead of passing writerVersion=null into the channel. - Remove a test-only overload of ChannelInternal - Remove an unnecessary parameter on ChannelInternal ctor (bdecVersion) --------- Co-authored-by: Hitesh Madan --- .../streaming/internal/AbstractRowBuffer.java | 3 + .../streaming/internal/BlobBuilder.java | 3 +- .../internal/ClientBufferParameters.java | 4 + .../internal/OpenChannelResponse.java | 10 +++ .../streaming/internal/ParquetFlusher.java | 13 +++- .../streaming/internal/ParquetRowBuffer.java | 9 ++- ...nowflakeStreamingIngestChannelFactory.java | 12 ++- ...owflakeStreamingIngestChannelInternal.java | 57 ++------------- ...nowflakeStreamingIngestClientInternal.java | 23 ++++-- .../net/snowflake/ingest/utils/Constants.java | 25 ++++++- .../parquet/hadoop/BdecParquetWriter.java | 20 +++-- .../streaming/internal/BlobBuilderTest.java | 13 +++- .../streaming/internal/ChannelCacheTest.java | 37 ++++++++-- .../streaming/internal/FlushServiceTest.java | 19 ++++- .../internal/InsertRowsBenchmarkTest.java | 7 +- .../streaming/internal/RowBufferTest.java | 4 + .../SnowflakeStreamingIngestChannelTest.java | 73 ++++++++++++++++--- .../SnowflakeStreamingIngestClientTest.java | 35 ++++++--- .../datatypes/AbstractDataTypeTest.java | 37 +++++++++- .../internal/datatypes/BinaryIT.java | 2 +- .../internal/datatypes/DateTimeIT.java | 2 +- .../internal/datatypes/IcebergDateTimeIT.java | 18 ++++- .../datatypes/IcebergLogicalTypesIT.java | 17 ++++- .../datatypes/IcebergNumericTypesIT.java | 17 ++++- .../internal/datatypes/IcebergStringIT.java | 17 ++++- .../datatypes/IcebergStructuredIT.java | 17 ++++- .../internal/datatypes/LogicalTypesIT.java | 2 +- .../streaming/internal/datatypes/NullIT.java | 2 +- .../internal/datatypes/NumericTypesIT.java | 2 +- .../internal/datatypes/SemiStructuredIT.java | 3 +- .../internal/datatypes/StringsIT.java | 2 +- .../streaming/internal/it/ColumnNamesIT.java | 2 +- 32 files changed, 389 insertions(+), 118 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 7b0adaabc..fa502f30a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -25,6 +25,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; /** * The abstract implementation of the buffer in the Streaming Ingest channel that holds the @@ -668,6 +669,7 @@ static AbstractRowBuffer createRowBuffer( ChannelRuntimeState channelRuntimeState, ClientBufferParameters clientBufferParameters, OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion, TelemetryService telemetryService) { switch (bdecVersion) { case THREE: @@ -681,6 +683,7 @@ static AbstractRowBuffer createRowBuffer( channelRuntimeState, clientBufferParameters, offsetTokenVerificationFunction, + parquetWriterVersion, telemetryService); default: throw new SFException( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 307093d3e..3e1de452a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -32,6 +32,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import org.apache.commons.codec.binary.Hex; +import org.apache.parquet.hadoop.ParquetFileWriter; /** * Build a single blob file that contains file header plus data. The header will be a @@ -140,7 +141,7 @@ static Blob constructBlobAndMetadata( if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { chunkMetadataBuilder - .setMajorVersion(Constants.PARQUET_MAJOR_VERSION) + .setMajorVersion(ParquetFileWriter.CURRENT_VERSION) .setMinorVersion(Constants.PARQUET_MINOR_VERSION) // set createdOn in seconds .setCreatedOn(System.currentTimeMillis() / 1000) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 0a9711ee8..9009642b3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -124,4 +124,8 @@ public Optional getMaxRowGroups() { public String getParquetMessageTypeName() { return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME; } + + public boolean isEnableDictionaryEncoding() { + return isIcebergMode; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java index 92f7ea8c5..0058624af 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java @@ -22,6 +22,7 @@ class OpenChannelResponse extends StreamingIngestResponse { private String encryptionKey; private Long encryptionKeyId; private FileLocationInfo icebergLocationInfo; + private String icebergSerializationPolicy; @JsonProperty("status_code") void setStatusCode(Long statusCode) { @@ -140,4 +141,13 @@ void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) { FileLocationInfo getIcebergLocationInfo() { return this.icebergLocationInfo; } + + @JsonProperty("iceberg_serialization_policy") + void setIcebergSerializationPolicy(String icebergSerializationPolicy) { + this.icebergSerializationPolicy = icebergSerializationPolicy; + } + + String getIcebergSerializationPolicy() { + return this.icebergSerializationPolicy; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index fcdd9cdfc..5b11996ec 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -16,6 +16,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; @@ -30,17 +31,23 @@ public class ParquetFlusher implements Flusher { private final Optional maxRowGroups; private final Constants.BdecParquetCompression bdecParquetCompression; + private final ParquetProperties.WriterVersion parquetWriterVersion; + private final boolean enableDictionaryEncoding; /** Construct parquet flusher from its schema. */ public ParquetFlusher( MessageType schema, long maxChunkSizeInBytes, Optional maxRowGroups, - Constants.BdecParquetCompression bdecParquetCompression) { + Constants.BdecParquetCompression bdecParquetCompression, + ParquetProperties.WriterVersion parquetWriterVersion, + boolean enableDictionaryEncoding) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; + this.parquetWriterVersion = parquetWriterVersion; + this.enableDictionaryEncoding = enableDictionaryEncoding; } @Override @@ -129,7 +136,9 @@ private SerializationResult serializeFromJavaObjects( firstChannelFullyQualifiedTableName, maxChunkSizeInBytes, maxRowGroups, - bdecParquetCompression); + bdecParquetCompression, + parquetWriterVersion, + enableDictionaryEncoding); rows.forEach(parquetWriter::writeRow); parquetWriter.close(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 339210f65..5e3fa1191 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -26,6 +26,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -45,6 +46,8 @@ public class ParquetRowBuffer extends AbstractRowBuffer { private final List> data; private final List> tempData; + private final ParquetProperties.WriterVersion parquetWriterVersion; + private MessageType schema; /** Construct a ParquetRowBuffer object. */ @@ -56,6 +59,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { ChannelRuntimeState channelRuntimeState, ClientBufferParameters clientBufferParameters, OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion, TelemetryService telemetryService) { super( onErrorOption, @@ -70,6 +74,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { this.metadata = new HashMap<>(); this.data = new ArrayList<>(); this.tempData = new ArrayList<>(); + this.parquetWriterVersion = parquetWriterVersion; } /** @@ -394,6 +399,8 @@ public Flusher createFlusher() { schema, clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getMaxRowGroups(), - clientBufferParameters.getBdecParquetCompression()); + clientBufferParameters.getBdecParquetCompression(), + parquetWriterVersion, + clientBufferParameters.isEnableDictionaryEncoding()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index a56b82ed5..40542c8b0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -8,6 +8,7 @@ import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** Builds a Streaming Ingest channel for a specific Streaming Ingest client */ class SnowflakeStreamingIngestChannelFactory { @@ -30,6 +31,7 @@ static class SnowflakeStreamingIngestChannelBuilder { private OpenChannelRequest.OnErrorOption onErrorOption; private ZoneId defaultTimezone; private OffsetTokenVerificationFunction offsetTokenVerificationFunction; + private ParquetProperties.WriterVersion parquetWriterVersion; private SnowflakeStreamingIngestChannelBuilder(String name) { this.name = name; @@ -98,6 +100,12 @@ SnowflakeStreamingIngestChannelBuilder setOffsetTokenVerificationFunction( return this; } + SnowflakeStreamingIngestChannelBuilder setParquetWriterVersion( + ParquetProperties.WriterVersion parquetWriterVersion) { + this.parquetWriterVersion = parquetWriterVersion; + return this; + } + SnowflakeStreamingIngestChannelInternal build() { Utils.assertStringNotNullOrEmpty("channel name", this.name); Utils.assertStringNotNullOrEmpty("table name", this.tableName); @@ -123,8 +131,8 @@ SnowflakeStreamingIngestChannelInternal build() { this.encryptionKeyId, this.onErrorOption, this.defaultTimezone, - this.owningClient.getParameterProvider().getBlobFormatVersion(), - this.offsetTokenVerificationFunction); + this.offsetTokenVerificationFunction, + this.parquetWriterVersion); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 4e884387b..167240ed7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -10,7 +10,6 @@ import com.google.common.annotations.VisibleForTesting; import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -19,17 +18,18 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * The first version of implementation for SnowflakeStreamingIngestChannel @@ -68,48 +68,6 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn private final MemoryInfoProvider memoryInfoProvider; private volatile long freeMemoryInBytes = 0; - /** - * Constructor for TESTING ONLY which allows us to set the test mode - * - * @param name - * @param dbName - * @param schemaName - * @param tableName - * @param offsetToken - * @param channelSequencer - * @param rowSequencer - * @param client - */ - SnowflakeStreamingIngestChannelInternal( - String name, - String dbName, - String schemaName, - String tableName, - String offsetToken, - Long channelSequencer, - Long rowSequencer, - SnowflakeStreamingIngestClientInternal client, - String encryptionKey, - Long encryptionKeyId, - OpenChannelRequest.OnErrorOption onErrorOption, - ZoneOffset defaultTimezone) { - this( - name, - dbName, - schemaName, - tableName, - offsetToken, - channelSequencer, - rowSequencer, - client, - encryptionKey, - encryptionKeyId, - onErrorOption, - defaultTimezone, - client.getParameterProvider().getBlobFormatVersion(), - null); - } - /** Default constructor */ SnowflakeStreamingIngestChannelInternal( String name, @@ -119,13 +77,13 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn String endOffsetToken, Long channelSequencer, Long rowSequencer, - SnowflakeStreamingIngestClientInternal client, + @Nonnull SnowflakeStreamingIngestClientInternal client, String encryptionKey, Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - Constants.BdecVersion bdecVersion, - OffsetTokenVerificationFunction offsetTokenVerificationFunction) { + OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion) { this.isClosed = false; this.owningClient = client; @@ -147,13 +105,14 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn AbstractRowBuffer.createRowBuffer( onErrorOption, defaultTimezone, - bdecVersion, + client.getParameterProvider().getBlobFormatVersion(), getFullyQualifiedName(), this::collectRowSize, channelState, new ClientBufferParameters(owningClient), offsetTokenVerificationFunction, - owningClient == null ? null : owningClient.getTelemetryService()); + parquetWriterVersion, + owningClient.getTelemetryService()); this.tableColumns = new HashMap<>(); logger.logInfo( "Channel={} created for table={}", diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 988189475..7751499f1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -71,6 +71,7 @@ import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * The first version of implementation for SnowflakeStreamingIngestClient. The client internally @@ -355,11 +356,17 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); } - if (isIcebergMode - && response.getTableColumns().stream() - .anyMatch(c -> c.getSourceIcebergDataType() == null)) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set"); + if (isIcebergMode) { + if (response.getTableColumns().stream().anyMatch(c -> c.getSourceIcebergDataType() == null)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set"); + } + + if (response.getIcebergSerializationPolicy() == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Iceberg Table's open channel response does not have serialization policy set."); + } } logger.logInfo( @@ -386,6 +393,12 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setOnErrorOption(request.getOnErrorOption()) .setDefaultTimezone(request.getDefaultTimezone()) .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) + .setParquetWriterVersion( + isIcebergMode + ? Constants.IcebergSerializationPolicy.valueOf( + response.getIcebergSerializationPolicy()) + .toParquetWriterVersion() + : ParquetProperties.WriterVersion.PARQUET_1_0) .build(); // Setup the row buffer schema diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index cb4bacf92..7198c7669 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.utils; import java.util.Arrays; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** Contains all the constants needed for Streaming Ingest */ @@ -71,9 +72,31 @@ public class Constants { public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; - public static final int PARQUET_MAJOR_VERSION = 1; public static final int PARQUET_MINOR_VERSION = 0; + /** + * Iceberg table serialization policy. Use v2 parquet writer for optimized serialization, + * otherwise v1. + */ + public enum IcebergSerializationPolicy { + COMPATIBLE, + OPTIMIZED; + + public ParquetProperties.WriterVersion toParquetWriterVersion() { + switch (this) { + case COMPATIBLE: + return ParquetProperties.WriterVersion.PARQUET_1_0; + case OPTIMIZED: + return ParquetProperties.WriterVersion.PARQUET_2_0; + default: + throw new IllegalArgumentException( + String.format( + "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", + this.name(), Arrays.asList(IcebergSerializationPolicy.values()))); + } + } + } + public enum WriteMode { CLOUD_STORAGE, REST_API, diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index c73269748..c3126847d 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -15,7 +15,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory; +import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory; import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -43,6 +43,10 @@ public class BdecParquetWriter implements AutoCloseable { // Optional cap on the max number of row groups to allow per file, if this is exceeded we'll end // up throwing private final Optional maxRowGroups; + + private final ParquetProperties.WriterVersion writerVersion; + private final boolean enableDictionaryEncoding; + private long rowsWritten = 0; /** @@ -63,10 +67,14 @@ public BdecParquetWriter( String channelName, long maxChunkSizeInBytes, Optional maxRowGroups, - Constants.BdecParquetCompression bdecParquetCompression) + Constants.BdecParquetCompression bdecParquetCompression, + ParquetProperties.WriterVersion writerVersion, + boolean enableDictionaryEncoding) throws IOException { OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes); this.maxRowGroups = maxRowGroups; + this.writerVersion = writerVersion; + this.enableDictionaryEncoding = enableDictionaryEncoding; ParquetProperties encodingProps = createParquetProperties(); Configuration conf = new Configuration(); WriteSupport> writeSupport = @@ -154,7 +162,7 @@ public void close() throws IOException { } } - private static ParquetProperties createParquetProperties() { + private ParquetProperties createParquetProperties() { /** * There are two main limitations on the server side that we have to overcome by tweaking * Parquet limits: @@ -182,11 +190,11 @@ private static ParquetProperties createParquetProperties() { return ParquetProperties.builder() // PARQUET_2_0 uses Encoding.DELTA_BYTE_ARRAY for byte arrays (e.g. SF sb16) // server side does not support it TODO: SNOW-657238 - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) - .withValuesWriterFactory(new DefaultV1ValuesWriterFactory()) + .withWriterVersion(writerVersion) + .withValuesWriterFactory(new DefaultValuesWriterFactory()) // the dictionary encoding (Encoding.*_DICTIONARY) is not supported by server side // scanner yet - .withDictionaryEncoding(false) + .withDictionaryEncoding(enableDictionaryEncoding) .withPageRowCountLimit(Integer.MAX_VALUE) .withMinRowCountForPageSizeCheck(Integer.MAX_VALUE) .build(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 1330152a4..04a740272 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -14,6 +14,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; @@ -76,7 +77,11 @@ private List> createChannelDataPerTable(int metada schema, 100L, isIceberg ? Optional.of(1) : Optional.empty(), - Constants.BdecParquetCompression.GZIP)) + Constants.BdecParquetCompression.GZIP, + isIceberg + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0, + isIceberg)) .when(channelData) .createFlusher(); @@ -90,7 +95,11 @@ private List> createChannelDataPerTable(int metada "CHANNEL", 1000, isIceberg ? Optional.of(1) : Optional.empty(), - Constants.BdecParquetCompression.GZIP); + Constants.BdecParquetCompression.GZIP, + isIceberg + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0, + isIceberg); bdecParquetWriter.writeRow(Collections.singletonList("1")); channelData.setVectors( new ParquetChunkData( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index ef3684774..d9a207ee8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -13,6 +13,7 @@ import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.OpenChannelRequest; +import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -61,7 +62,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -75,7 +80,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -89,7 +98,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channel1); cache.addChannel(channel2); cache.addChannel(channel3); @@ -115,7 +128,11 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channel); Assert.assertEquals(1, cache.getSize()); Assert.assertTrue(channel == cache.entrySet().iterator().next().getValue().get(channelName)); @@ -133,7 +150,11 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channelDup); // The old channel should be invalid now Assert.assertTrue(!channel.isValid()); @@ -214,7 +235,11 @@ public void testRemoveChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.removeChannelIfSequencersMatch(channel3Dup); // Verify that remove the same channel with a different channel sequencer is a no op Assert.assertEquals(1, cache.getSize()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index ca0fd5295..a7e4ba35b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -52,6 +52,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.junit.Assert; @@ -302,8 +303,10 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( encryptionKeyId, onErrorOption, defaultTimezone, - Constants.BdecVersion.THREE, - null); + null, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); } @Override @@ -1061,7 +1064,11 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC); + ZoneOffset.UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( @@ -1076,7 +1083,11 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC); + ZoneOffset.UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channelCache.addChannel(channel1); channelCache.addChannel(channel2); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index ebe6742a2..b2b81adad 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -15,6 +15,7 @@ import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -78,7 +79,11 @@ public void setUpBeforeAll() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); // Setup column fields and vectors ColumnMetadata col = new ColumnMetadata(); col.setOrdinal(1); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 5d7873b95..e1cb764dd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -28,6 +28,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.StringUtils; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetReader; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -155,6 +156,9 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o isIcebergMode ? Optional.of(1) : Optional.empty(), isIcebergMode), null, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0, null); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index b3e2d23d6..3c0bdfc2b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -48,6 +48,7 @@ import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.column.ParquetProperties; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -199,7 +200,11 @@ public void testChannelValid() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); Assert.assertTrue(channel.isValid()); channel.invalidate("from testChannelValid", "Invalidated by test"); @@ -247,7 +252,11 @@ public void testChannelClose() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); Assert.assertFalse(channel.isClosed()); channel.markClosed(); @@ -545,7 +554,11 @@ public void testInsertRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ColumnMetadata col = new ColumnMetadata(); col.setOrdinal(1); @@ -631,7 +644,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); InsertValidationResponse insertValidationResponse = channel.insertRow(row, "token-1"); @@ -655,7 +672,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.ABORT, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); try { @@ -680,7 +701,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.SKIP_BATCH, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); insertValidationResponse = channel.insertRow(row, "token-1"); @@ -711,7 +736,11 @@ public void testInsertRowThrottling() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); memoryInfoProvider.freeMemory = @@ -755,7 +784,11 @@ public void testFlush() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -789,7 +822,11 @@ public void testClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -821,7 +858,11 @@ public void testDropOnClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -856,7 +897,11 @@ public void testDropOnCloseInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -887,7 +932,11 @@ public void testGetLatestCommittedOffsetToken() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 14de4342b..627593e82 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -61,6 +61,7 @@ import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.bouncycastle.asn1.nist.NISTObjectIdentifiers; @@ -131,7 +132,7 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, + null, null); channel2 = new SnowflakeStreamingIngestChannelInternal<>( @@ -147,7 +148,7 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, + null, null); channel3 = new SnowflakeStreamingIngestChannelInternal<>( @@ -163,7 +164,7 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, + null, null); channel4 = new SnowflakeStreamingIngestChannelInternal<>( @@ -179,7 +180,7 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, + null, null); } @@ -376,7 +377,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, + null, null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = @@ -438,7 +439,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, + null, null); try { @@ -491,7 +492,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, + null, null); ChannelMetadata channelMetadata = @@ -1066,7 +1067,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( channel2Name, @@ -1080,7 +1085,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); client.getChannelCache().addChannel(channel1); client.getChannelCache().addChannel(channel2); @@ -1186,7 +1195,11 @@ public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); client.getChannelCache().addChannel(channel); ChannelsStatusResponse response = new ChannelsStatusResponse(); @@ -1257,7 +1270,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, + null, null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 3c58fb000..545a4827b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -66,19 +66,52 @@ public abstract class AbstractDataTypeTest { protected static final ObjectMapper objectMapper = new ObjectMapper(); @Parameters(name = "{index}: {0}") - public static Object[] compressionAlgorithms() { + public static Object[] parameters() { return new Object[] {"GZIP", "ZSTD"}; } @Parameter public String compressionAlgorithm; - public void before(boolean isIceberg) throws Exception { + public void before() throws Exception { + setUp( + false /* isIceberg */, + compressionAlgorithm, + Constants.IcebergSerializationPolicy.COMPATIBLE); + } + + public void beforeIceberg( + String compressionAlgorithm, Constants.IcebergSerializationPolicy serializationPolicy) + throws Exception { + setUp(true /* isIceberg */, compressionAlgorithm, serializationPolicy); + } + + protected void setUp( + boolean isIceberg, + String compressionAlgorithm, + Constants.IcebergSerializationPolicy serializationPolicy) + throws Exception { databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier()); conn = TestUtils.getConnection(true); conn.createStatement().execute(String.format("create or replace database %s;", databaseName)); conn.createStatement().execute(String.format("use database %s;", databaseName)); conn.createStatement().execute(String.format("use schema %s;", schemaName)); + switch (serializationPolicy) { + case COMPATIBLE: + conn.createStatement() + .execute( + String.format( + "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'COMPATIBLE';", + schemaName)); + break; + case OPTIMIZED: + conn.createStatement() + .execute( + String.format( + "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED';", schemaName)); + break; + } + conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java index a379f27ad..9f3f69751 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java @@ -7,7 +7,7 @@ public class BinaryIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java index 7695a5a12..7eabb40e6 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java @@ -21,7 +21,7 @@ public class DateTimeIT extends AbstractDataTypeTest { @Before public void setup() throws Exception { - super.before(false); + super.before(); // Set to a random time zone not to interfere with any of the tests conn.createStatement().execute("alter session set timezone = 'America/New_York';"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java index 3fe53aed5..a9606cf53 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java @@ -10,18 +10,34 @@ import java.time.OffsetTime; import java.time.ZoneOffset; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergDateTimeIT extends AbstractDataTypeTest { + + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java index 33e062d72..9ff465b33 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java @@ -1,18 +1,33 @@ package net.snowflake.ingest.streaming.internal.datatypes; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergLogicalTypesIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java index b8df92228..e4b0783d4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java @@ -2,18 +2,33 @@ import java.math.BigDecimal; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergNumericTypesIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java index 9ee7788a1..515a0305c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java @@ -2,6 +2,7 @@ import java.math.BigDecimal; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.lang3.StringUtils; @@ -9,12 +10,26 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergStringIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 6bcc34635..f99e82d30 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -7,18 +7,33 @@ import java.util.UUID; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergStructuredIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java index 526d4e17d..721ce3f52 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java @@ -8,7 +8,7 @@ public class LogicalTypesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java index 23e933c14..5d1843aa1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java @@ -7,7 +7,7 @@ public class NullIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java index ebdb37015..e3d5300a0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java @@ -8,7 +8,7 @@ public class NumericTypesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java index b2eaa8d48..5905f92d9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java @@ -22,8 +22,9 @@ public class SemiStructuredIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } + // TODO SNOW-664249: There is a few-byte mismatch between the value sent by the user and its // server-side representation. Validation leaves a small buffer for this difference. private static final int MAX_ALLOWED_LENGTH = 16 * 1024 * 1024 - 64; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java index acd9b2283..e11b82005 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java @@ -19,7 +19,7 @@ public class StringsIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java index d8dbcc72f..fb253e5ea 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java @@ -23,7 +23,7 @@ public class ColumnNamesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test