From 756771e2c71f959bf7b83e368e8f89bba6f402b5 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 9 Oct 2024 21:25:38 +0000 Subject: [PATCH 1/4] v2 --- .../streaming/internal/BlobBuilder.java | 11 +++-- .../internal/ChannelFlushContext.java | 11 ++++- .../internal/ClientBufferParameters.java | 8 +++- .../internal/OpenChannelResponse.java | 11 +++++ .../streaming/internal/ParquetFlusher.java | 20 ++++++++- .../streaming/internal/ParquetRowBuffer.java | 3 +- ...nowflakeStreamingIngestChannelFactory.java | 11 ++++- ...owflakeStreamingIngestChannelInternal.java | 18 ++++++-- ...nowflakeStreamingIngestClientInternal.java | 2 + .../net/snowflake/ingest/utils/Constants.java | 42 ++++++++++++++++++- .../parquet/hadoop/BdecParquetWriter.java | 20 ++++++--- .../streaming/internal/BlobBuilderTest.java | 22 ++++++++-- .../streaming/internal/FlushServiceTest.java | 14 +++++-- .../streaming/internal/RowBufferTest.java | 14 ++++++- .../SnowflakeStreamingIngestClientTest.java | 24 +++++++---- 15 files changed, 197 insertions(+), 34 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 307093d3e..487bc5cbe 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -32,6 +32,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import org.apache.commons.codec.binary.Hex; +import org.apache.parquet.hadoop.ParquetFileWriter; /** * Build a single blob file that contains file header plus data. The header will be a @@ -68,8 +69,12 @@ static Blob constructBlobAndMetadata( List>> blobData, Constants.BdecVersion bdecVersion, InternalParameterProvider internalParameterProvider) - throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, - InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, + throws IOException, + NoSuchPaddingException, + NoSuchAlgorithmException, + InvalidAlgorithmParameterException, + InvalidKeyException, + IllegalBlockSizeException, BadPaddingException { List chunksMetadataList = new ArrayList<>(); List chunksDataList = new ArrayList<>(); @@ -140,7 +145,7 @@ static Blob constructBlobAndMetadata( if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { chunkMetadataBuilder - .setMajorVersion(Constants.PARQUET_MAJOR_VERSION) + .setMajorVersion(ParquetFileWriter.CURRENT_VERSION) .setMinorVersion(Constants.PARQUET_MINOR_VERSION) // set createdOn in seconds .setCreatedOn(System.currentTimeMillis() / 1000) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java index fe9542267..1de924cc5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * Channel immutable identification and encryption attributes. @@ -29,6 +30,8 @@ class ChannelFlushContext { // Data encryption key id private final Long encryptionKeyId; + private final ParquetProperties.WriterVersion parquetWriterVersion; + ChannelFlushContext( String name, String dbName, @@ -36,7 +39,8 @@ class ChannelFlushContext { String tableName, Long channelSequencer, String encryptionKey, - Long encryptionKeyId) { + Long encryptionKeyId, + ParquetProperties.WriterVersion parquetWriterVersion) { this.name = name; this.fullyQualifiedName = Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name); @@ -47,6 +51,7 @@ class ChannelFlushContext { this.channelSequencer = channelSequencer; this.encryptionKey = encryptionKey; this.encryptionKeyId = encryptionKeyId; + this.parquetWriterVersion = parquetWriterVersion; } @Override @@ -115,4 +120,8 @@ String getEncryptionKey() { Long getEncryptionKeyId() { return encryptionKeyId; } + + ParquetProperties.WriterVersion getParquetWriterVersion() { + return parquetWriterVersion; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 0a9711ee8..5699c56f5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -47,7 +47,9 @@ private ClientBufferParameters( this.isIcebergMode = isIcebergMode; } - /** @param clientInternal reference to the client object where the relevant parameters are set */ + /** + * @param clientInternal reference to the client object where the relevant parameters are set + */ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) { this.maxChunkSizeInBytes = clientInternal != null @@ -124,4 +126,8 @@ public Optional getMaxRowGroups() { public String getParquetMessageTypeName() { return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME; } + + public boolean isEnableDictionaryEncoding() { + return isIcebergMode; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java index 92f7ea8c5..dce6f060f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import net.snowflake.ingest.utils.Constants; /** Response to the OpenChannelRequest */ class OpenChannelResponse extends StreamingIngestResponse { @@ -22,6 +23,7 @@ class OpenChannelResponse extends StreamingIngestResponse { private String encryptionKey; private Long encryptionKeyId; private FileLocationInfo icebergLocationInfo; + private String icebergSerializationPolicy; @JsonProperty("status_code") void setStatusCode(Long statusCode) { @@ -140,4 +142,13 @@ void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) { FileLocationInfo getIcebergLocationInfo() { return this.icebergLocationInfo; } + + @JsonProperty("iceberg_serialization_policy") + void setIcebergSerializationPolicy(String icebergSerializationPolicy) { + this.icebergSerializationPolicy = icebergSerializationPolicy; + } + + Constants.IcebergSerializationPolicy getIcebergSerializationPolicy() { + return Constants.IcebergSerializationPolicy.fromName(this.icebergSerializationPolicy); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index fcdd9cdfc..3ee240226 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -16,6 +16,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; @@ -30,17 +31,20 @@ public class ParquetFlusher implements Flusher { private final Optional maxRowGroups; private final Constants.BdecParquetCompression bdecParquetCompression; + private final boolean enableDictionaryEncoding; /** Construct parquet flusher from its schema. */ public ParquetFlusher( MessageType schema, long maxChunkSizeInBytes, Optional maxRowGroups, - Constants.BdecParquetCompression bdecParquetCompression) { + Constants.BdecParquetCompression bdecParquetCompression, + boolean enableDictionaryEncoding) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; + this.enableDictionaryEncoding = enableDictionaryEncoding; } @Override @@ -62,6 +66,7 @@ private SerializationResult serializeFromJavaObjects( BdecParquetWriter parquetWriter; ByteArrayOutputStream mergedData = new ByteArrayOutputStream(); Pair chunkMinMaxInsertTimeInMs = null; + ParquetProperties.WriterVersion parquetWriterVersion = null; for (ChannelData data : channelsDataPerTable) { // Create channel metadata @@ -103,6 +108,15 @@ private SerializationResult serializeFromJavaObjects( chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); } + // Check if all the channels have the same parquet writer version + if (parquetWriterVersion == null) { + parquetWriterVersion = data.getChannelContext().getParquetWriterVersion(); + } else if (!parquetWriterVersion.equals(data.getChannelContext().getParquetWriterVersion())) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Parquet writer version and storage serialization policy mismatch within a chunk"); + } + rows.addAll(data.getVectors().rows); rowCount += data.getRowCount(); @@ -129,7 +143,9 @@ private SerializationResult serializeFromJavaObjects( firstChannelFullyQualifiedTableName, maxChunkSizeInBytes, maxRowGroups, - bdecParquetCompression); + bdecParquetCompression, + parquetWriterVersion, + enableDictionaryEncoding); rows.forEach(parquetWriter::writeRow); parquetWriter.close(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 339210f65..78dba9cce 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -394,6 +394,7 @@ public Flusher createFlusher() { schema, clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getMaxRowGroups(), - clientBufferParameters.getBdecParquetCompression()); + clientBufferParameters.getBdecParquetCompression(), + clientBufferParameters.isEnableDictionaryEncoding()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index a56b82ed5..9c39adc80 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -8,6 +8,7 @@ import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** Builds a Streaming Ingest channel for a specific Streaming Ingest client */ class SnowflakeStreamingIngestChannelFactory { @@ -30,6 +31,7 @@ static class SnowflakeStreamingIngestChannelBuilder { private OpenChannelRequest.OnErrorOption onErrorOption; private ZoneId defaultTimezone; private OffsetTokenVerificationFunction offsetTokenVerificationFunction; + private ParquetProperties.WriterVersion parquetWriterVersion; private SnowflakeStreamingIngestChannelBuilder(String name) { this.name = name; @@ -98,6 +100,12 @@ SnowflakeStreamingIngestChannelBuilder setOffsetTokenVerificationFunction( return this; } + SnowflakeStreamingIngestChannelBuilder setParquetWriterVersion( + ParquetProperties.WriterVersion parquetWriterVersion) { + this.parquetWriterVersion = parquetWriterVersion; + return this; + } + SnowflakeStreamingIngestChannelInternal build() { Utils.assertStringNotNullOrEmpty("channel name", this.name); Utils.assertStringNotNullOrEmpty("table name", this.tableName); @@ -124,7 +132,8 @@ SnowflakeStreamingIngestChannelInternal build() { this.onErrorOption, this.defaultTimezone, this.owningClient.getParameterProvider().getBlobFormatVersion(), - this.offsetTokenVerificationFunction); + this.offsetTokenVerificationFunction, + this.parquetWriterVersion); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 4e884387b..af12b31ea 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -30,6 +30,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * The first version of implementation for SnowflakeStreamingIngestChannel @@ -107,7 +108,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn onErrorOption, defaultTimezone, client.getParameterProvider().getBlobFormatVersion(), - null); + null /* offsetTokenVerificationFunction */, + null /* parquetWriterVersion */); } /** Default constructor */ @@ -125,7 +127,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, Constants.BdecVersion bdecVersion, - OffsetTokenVerificationFunction offsetTokenVerificationFunction) { + OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion) { this.isClosed = false; this.owningClient = client; @@ -141,7 +144,16 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this.memoryInfoProvider = MemoryInfoProviderFromRuntime.getInstance(); this.channelFlushContext = new ChannelFlushContext( - name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId); + name, + dbName, + schemaName, + tableName, + channelSequencer, + encryptionKey, + encryptionKeyId, + parquetWriterVersion == null + ? ParquetProperties.DEFAULT_WRITER_VERSION + : parquetWriterVersion); this.channelState = new ChannelRuntimeState(endOffsetToken, rowSequencer, true); this.rowBuffer = AbstractRowBuffer.createRowBuffer( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 988189475..ee681775b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -386,6 +386,8 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setOnErrorOption(request.getOnErrorOption()) .setDefaultTimezone(request.getDefaultTimezone()) .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) + .setParquetWriterVersion( + response.getIcebergSerializationPolicy().getParquetWriterVersion()) .build(); // Setup the row buffer schema diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index cb4bacf92..148339ea4 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.utils; import java.util.Arrays; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** Contains all the constants needed for Streaming Ingest */ @@ -71,9 +72,48 @@ public class Constants { public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; - public static final int PARQUET_MAJOR_VERSION = 1; public static final int PARQUET_MINOR_VERSION = 0; + /** + * Iceberg table serialization policy. Use v2 parquet writer for optimized serialization, + * otherwise v1. + */ + public enum IcebergSerializationPolicy { + NON_ICEBERG, + COMPATIBLE, + OPTIMIZED; + + public static IcebergSerializationPolicy fromName(String name) { + if (name == null) { + return NON_ICEBERG; + } + for (IcebergSerializationPolicy e : IcebergSerializationPolicy.values()) { + if (e.name().equalsIgnoreCase(name)) { + return e; + } + } + throw new IllegalArgumentException( + String.format( + "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", + name, Arrays.asList(IcebergSerializationPolicy.values()))); + } + + public ParquetProperties.WriterVersion getParquetWriterVersion() { + switch (this) { + case NON_ICEBERG: + case COMPATIBLE: + return ParquetProperties.WriterVersion.PARQUET_1_0; + case OPTIMIZED: + return ParquetProperties.WriterVersion.PARQUET_2_0; + default: + throw new IllegalArgumentException( + String.format( + "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", + this.name(), Arrays.asList(IcebergSerializationPolicy.values()))); + } + } + } + public enum WriteMode { CLOUD_STORAGE, REST_API, diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index c73269748..c3126847d 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -15,7 +15,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory; +import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory; import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -43,6 +43,10 @@ public class BdecParquetWriter implements AutoCloseable { // Optional cap on the max number of row groups to allow per file, if this is exceeded we'll end // up throwing private final Optional maxRowGroups; + + private final ParquetProperties.WriterVersion writerVersion; + private final boolean enableDictionaryEncoding; + private long rowsWritten = 0; /** @@ -63,10 +67,14 @@ public BdecParquetWriter( String channelName, long maxChunkSizeInBytes, Optional maxRowGroups, - Constants.BdecParquetCompression bdecParquetCompression) + Constants.BdecParquetCompression bdecParquetCompression, + ParquetProperties.WriterVersion writerVersion, + boolean enableDictionaryEncoding) throws IOException { OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes); this.maxRowGroups = maxRowGroups; + this.writerVersion = writerVersion; + this.enableDictionaryEncoding = enableDictionaryEncoding; ParquetProperties encodingProps = createParquetProperties(); Configuration conf = new Configuration(); WriteSupport> writeSupport = @@ -154,7 +162,7 @@ public void close() throws IOException { } } - private static ParquetProperties createParquetProperties() { + private ParquetProperties createParquetProperties() { /** * There are two main limitations on the server side that we have to overcome by tweaking * Parquet limits: @@ -182,11 +190,11 @@ private static ParquetProperties createParquetProperties() { return ParquetProperties.builder() // PARQUET_2_0 uses Encoding.DELTA_BYTE_ARRAY for byte arrays (e.g. SF sb16) // server side does not support it TODO: SNOW-657238 - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) - .withValuesWriterFactory(new DefaultV1ValuesWriterFactory()) + .withWriterVersion(writerVersion) + .withValuesWriterFactory(new DefaultValuesWriterFactory()) // the dictionary encoding (Encoding.*_DICTIONARY) is not supported by server side // scanner yet - .withDictionaryEncoding(false) + .withDictionaryEncoding(enableDictionaryEncoding) .withPageRowCountLimit(Integer.MAX_VALUE) .withMinRowCountForPageSizeCheck(Integer.MAX_VALUE) .build(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 1330152a4..cf254c193 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -14,6 +14,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; @@ -76,7 +77,8 @@ private List> createChannelDataPerTable(int metada schema, 100L, isIceberg ? Optional.of(1) : Optional.empty(), - Constants.BdecParquetCompression.GZIP)) + Constants.BdecParquetCompression.GZIP, + isIceberg)) .when(channelData) .createFlusher(); @@ -90,7 +92,11 @@ private List> createChannelDataPerTable(int metada "CHANNEL", 1000, isIceberg ? Optional.of(1) : Optional.empty(), - Constants.BdecParquetCompression.GZIP); + Constants.BdecParquetCompression.GZIP, + isIceberg + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0, + isIceberg); bdecParquetWriter.writeRow(Collections.singletonList("1")); channelData.setVectors( new ParquetChunkData( @@ -115,7 +121,17 @@ private List> createChannelDataPerTable(int metada .named("test")) : new RowBufferStats(columnName, null, 1, null, null)); channelData.setChannelContext( - new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L)); + new ChannelFlushContext( + "channel1", + "DB", + "SCHEMA", + "TABLE", + 1L, + "enc", + 1L, + isIceberg + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0)); return Collections.singletonList(channelData); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index ca0fd5295..bd2fc0043 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -52,6 +52,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.junit.Assert; @@ -303,7 +304,10 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( onErrorOption, defaultTimezone, Constants.BdecVersion.THREE, - null); + null, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); } @Override @@ -1227,8 +1231,12 @@ public void testShutDown() throws Exception { @Test public void testEncryptionDecryption() - throws InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, - NoSuchAlgorithmException, BadPaddingException, InvalidKeyException { + throws InvalidAlgorithmParameterException, + NoSuchPaddingException, + IllegalBlockSizeException, + NoSuchAlgorithmException, + BadPaddingException, + InvalidKeyException { byte[] data = "testEncryptionDecryption".getBytes(StandardCharsets.UTF_8); String encryptionKey = Base64.getEncoder().encodeToString("encryption_key".getBytes(StandardCharsets.UTF_8)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 5d7873b95..5ce58371e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -28,6 +28,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.StringUtils; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetReader; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -1924,7 +1925,18 @@ public void testParquetFileNameMetadata() throws IOException { bufferUnderTest.setupSchema(Collections.singletonList(colChar)); loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); ChannelData data = bufferUnderTest.flush(); - data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L)); + data.setChannelContext( + new ChannelFlushContext( + "name", + "db", + "schema", + "table", + 1L, + "key", + 0L, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0)); ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); Flusher.SerializationResult result = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 14de4342b..f4e835772 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -132,7 +132,8 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, - null); + null, + null); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -148,7 +149,8 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, - null); + null, + null); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -164,7 +166,8 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, - null); + null, + null); channel4 = new SnowflakeStreamingIngestChannelInternal<>( "channel4", @@ -180,7 +183,8 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, - null); + null, + null); } @Test @@ -377,7 +381,8 @@ public void testGetChannelsStatusWithRequest() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, - null); + null, + null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); @@ -439,7 +444,8 @@ public void testGetChannelsStatusWithRequestError() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, - null); + null, + null); try { client.getChannelsStatus(Collections.singletonList(channel)); @@ -492,7 +498,8 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, - null); + null, + null); ChannelMetadata channelMetadata = ChannelMetadata.builder() @@ -1258,7 +1265,8 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, - null); + null, + null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); From 7fa0e3a4cab63394e0c8feb8429b8c9d0e866610 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 9 Oct 2024 22:23:18 +0000 Subject: [PATCH 2/4] Fix ITs --- .../streaming/internal/BlobBuilder.java | 8 +--- .../internal/ClientBufferParameters.java | 4 +- .../streaming/internal/FlushServiceTest.java | 8 +--- .../SnowflakeStreamingIngestClientTest.java | 16 ++++---- .../datatypes/AbstractDataTypeTest.java | 37 ++++++++++++++++++- .../internal/datatypes/BinaryIT.java | 2 +- .../internal/datatypes/DateTimeIT.java | 2 +- .../internal/datatypes/IcebergDateTimeIT.java | 20 +++++++++- .../datatypes/IcebergLogicalTypesIT.java | 19 +++++++++- .../datatypes/IcebergNumericTypesIT.java | 19 +++++++++- .../internal/datatypes/IcebergStringIT.java | 19 +++++++++- .../datatypes/IcebergStructuredIT.java | 19 +++++++++- .../internal/datatypes/LogicalTypesIT.java | 2 +- .../streaming/internal/datatypes/NullIT.java | 2 +- .../internal/datatypes/NumericTypesIT.java | 2 +- .../internal/datatypes/SemiStructuredIT.java | 3 +- .../internal/datatypes/StringsIT.java | 2 +- .../streaming/internal/it/ColumnNamesIT.java | 2 +- 18 files changed, 148 insertions(+), 38 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 487bc5cbe..3e1de452a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -69,12 +69,8 @@ static Blob constructBlobAndMetadata( List>> blobData, Constants.BdecVersion bdecVersion, InternalParameterProvider internalParameterProvider) - throws IOException, - NoSuchPaddingException, - NoSuchAlgorithmException, - InvalidAlgorithmParameterException, - InvalidKeyException, - IllegalBlockSizeException, + throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, + InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { List chunksMetadataList = new ArrayList<>(); List chunksDataList = new ArrayList<>(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 5699c56f5..9009642b3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -47,9 +47,7 @@ private ClientBufferParameters( this.isIcebergMode = isIcebergMode; } - /** - * @param clientInternal reference to the client object where the relevant parameters are set - */ + /** @param clientInternal reference to the client object where the relevant parameters are set */ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) { this.maxChunkSizeInBytes = clientInternal != null diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index bd2fc0043..9043d2ff5 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -1231,12 +1231,8 @@ public void testShutDown() throws Exception { @Test public void testEncryptionDecryption() - throws InvalidAlgorithmParameterException, - NoSuchPaddingException, - IllegalBlockSizeException, - NoSuchAlgorithmException, - BadPaddingException, - InvalidKeyException { + throws InvalidAlgorithmParameterException, NoSuchPaddingException, IllegalBlockSizeException, + NoSuchAlgorithmException, BadPaddingException, InvalidKeyException { byte[] data = "testEncryptionDecryption".getBytes(StandardCharsets.UTF_8); String encryptionKey = Base64.getEncoder().encodeToString("encryption_key".getBytes(StandardCharsets.UTF_8)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index f4e835772..c468f03ea 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -133,7 +133,7 @@ public void setup() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -150,7 +150,7 @@ public void setup() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -167,7 +167,7 @@ public void setup() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); channel4 = new SnowflakeStreamingIngestChannelInternal<>( "channel4", @@ -184,7 +184,7 @@ public void setup() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); } @Test @@ -382,7 +382,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); @@ -445,7 +445,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); try { client.getChannelsStatus(Collections.singletonList(channel)); @@ -499,7 +499,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); ChannelMetadata channelMetadata = ChannelMetadata.builder() @@ -1266,7 +1266,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { ZoneOffset.UTC, BDEC_VERSION, null, - null); + null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = new ChannelsStatusRequest.ChannelStatusRequestDTO(channel); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 3c58fb000..a1dfc6e2c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -66,19 +66,52 @@ public abstract class AbstractDataTypeTest { protected static final ObjectMapper objectMapper = new ObjectMapper(); @Parameters(name = "{index}: {0}") - public static Object[] compressionAlgorithms() { + public static Object[] parameters() { return new Object[] {"GZIP", "ZSTD"}; } @Parameter public String compressionAlgorithm; - public void before(boolean isIceberg) throws Exception { + public void before() throws Exception { + setUp( + false /* isIceberg */, + compressionAlgorithm, + Constants.IcebergSerializationPolicy.NON_ICEBERG); + } + + public void beforeIceberg( + String compressionAlgorithm, Constants.IcebergSerializationPolicy serializationPolicy) + throws Exception { + setUp(true /* isIceberg */, compressionAlgorithm, serializationPolicy); + } + + protected void setUp( + boolean isIceberg, + String compressionAlgorithm, + Constants.IcebergSerializationPolicy serializationPolicy) + throws Exception { databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier()); conn = TestUtils.getConnection(true); conn.createStatement().execute(String.format("create or replace database %s;", databaseName)); conn.createStatement().execute(String.format("use database %s;", databaseName)); conn.createStatement().execute(String.format("use schema %s;", schemaName)); + switch (serializationPolicy) { + case COMPATIBLE: + conn.createStatement() + .execute( + String.format( + "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'COMPATIBLE';", + schemaName)); + break; + case OPTIMIZED: + conn.createStatement() + .execute( + String.format( + "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED';", schemaName)); + break; + } + conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java index a379f27ad..9f3f69751 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/BinaryIT.java @@ -7,7 +7,7 @@ public class BinaryIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java index 7695a5a12..7eabb40e6 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/DateTimeIT.java @@ -21,7 +21,7 @@ public class DateTimeIT extends AbstractDataTypeTest { @Before public void setup() throws Exception { - super.before(false); + super.before(); // Set to a random time zone not to interfere with any of the tests conn.createStatement().execute("alter session set timezone = 'America/New_York';"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java index 3fe53aed5..ddb8f278b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java @@ -10,18 +10,36 @@ import java.time.OffsetTime; import java.time.ZoneOffset; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergDateTimeIT extends AbstractDataTypeTest { + + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java index 33e062d72..985c486a6 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java @@ -1,18 +1,35 @@ package net.snowflake.ingest.streaming.internal.datatypes; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergLogicalTypesIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java index b8df92228..1734b1d71 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java @@ -2,18 +2,35 @@ import java.math.BigDecimal; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergNumericTypesIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java index 9ee7788a1..d30be6d0b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java @@ -2,6 +2,7 @@ import java.math.BigDecimal; import java.util.Arrays; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.lang3.StringUtils; @@ -9,12 +10,28 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergStringIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 6bcc34635..3ca84f4e4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -7,18 +7,35 @@ import java.util.UUID; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergStructuredIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") + public static Object[][] parameters() { + return new Object[][] { + {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + }; + } + + @Parameterized.Parameter public static String compressionAlgorithm; + + @Parameterized.Parameter(1) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(true); + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java index 526d4e17d..721ce3f52 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/LogicalTypesIT.java @@ -8,7 +8,7 @@ public class LogicalTypesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java index 23e933c14..5d1843aa1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NullIT.java @@ -7,7 +7,7 @@ public class NullIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java index ebdb37015..e3d5300a0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/NumericTypesIT.java @@ -8,7 +8,7 @@ public class NumericTypesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java index b2eaa8d48..5905f92d9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/SemiStructuredIT.java @@ -22,8 +22,9 @@ public class SemiStructuredIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } + // TODO SNOW-664249: There is a few-byte mismatch between the value sent by the user and its // server-side representation. Validation leaves a small buffer for this difference. private static final int MAX_ALLOWED_LENGTH = 16 * 1024 * 1024 - 64; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java index acd9b2283..e11b82005 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/StringsIT.java @@ -19,7 +19,7 @@ public class StringsIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java index d8dbcc72f..fb253e5ea 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java @@ -23,7 +23,7 @@ public class ColumnNamesIT extends AbstractDataTypeTest { @Before public void before() throws Exception { - super.before(false); + super.before(); } @Test From 1784af85bb49bed9666e7f0a87bfed9c7377ecdf Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 9 Oct 2024 22:32:11 +0000 Subject: [PATCH 3/4] delete gzip --- .../ingest/streaming/internal/datatypes/IcebergDateTimeIT.java | 2 -- .../streaming/internal/datatypes/IcebergLogicalTypesIT.java | 2 -- .../streaming/internal/datatypes/IcebergNumericTypesIT.java | 2 -- .../ingest/streaming/internal/datatypes/IcebergStringIT.java | 2 -- .../streaming/internal/datatypes/IcebergStructuredIT.java | 2 -- 5 files changed, 10 deletions(-) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java index ddb8f278b..a9606cf53 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java @@ -25,8 +25,6 @@ public class IcebergDateTimeIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") public static Object[][] parameters() { return new Object[][] { - {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, - {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} }; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java index 985c486a6..9ff465b33 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergLogicalTypesIT.java @@ -15,8 +15,6 @@ public class IcebergLogicalTypesIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") public static Object[][] parameters() { return new Object[][] { - {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, - {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} }; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java index 1734b1d71..e4b0783d4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java @@ -16,8 +16,6 @@ public class IcebergNumericTypesIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") public static Object[][] parameters() { return new Object[][] { - {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, - {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} }; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java index d30be6d0b..515a0305c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStringIT.java @@ -17,8 +17,6 @@ public class IcebergStringIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") public static Object[][] parameters() { return new Object[][] { - {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, - {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} }; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 3ca84f4e4..f99e82d30 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -21,8 +21,6 @@ public class IcebergStructuredIT extends AbstractDataTypeTest { @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") public static Object[][] parameters() { return new Object[][] { - {"GZIP", Constants.IcebergSerializationPolicy.COMPATIBLE}, - {"GZIP", Constants.IcebergSerializationPolicy.OPTIMIZED}, {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} }; From 7356bee2ff478e2b722550b04351dbcf86755a9a Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Fri, 11 Oct 2024 22:28:35 -0700 Subject: [PATCH 4/4] (Stacked PR) Code review changes for Parquet V2 (#862) 1. Remove logic from OpenChannelResponse contract class 2. Move writerVersion defaulting to channel construction callsite in clientInternal (from channel ctor), instead of passing writerVersion=null into the channel. 3. Pass around writerVersion via RowBuffer into Flusher, instead of via the per-chunk flushContext. 4. Remove a test-only overload of ChannelInternal 5. Remove an unnecessary parameter on ChannelInternal ctor (bdecVersion) 6. Remove SerializationPolicy.NON_ICEBERG, remove the custom SerPolicy.fromName method and use Enum.valueOf that java already has --- .../streaming/internal/AbstractRowBuffer.java | 3 + .../internal/ChannelFlushContext.java | 11 +-- .../internal/OpenChannelResponse.java | 5 +- .../streaming/internal/ParquetFlusher.java | 13 +--- .../streaming/internal/ParquetRowBuffer.java | 6 ++ ...nowflakeStreamingIngestChannelFactory.java | 1 - ...owflakeStreamingIngestChannelInternal.java | 65 ++--------------- ...nowflakeStreamingIngestClientInternal.java | 23 ++++-- .../net/snowflake/ingest/utils/Constants.java | 19 +---- .../streaming/internal/BlobBuilderTest.java | 15 +--- .../streaming/internal/ChannelCacheTest.java | 37 ++++++++-- .../streaming/internal/FlushServiceTest.java | 13 +++- .../internal/InsertRowsBenchmarkTest.java | 7 +- .../streaming/internal/RowBufferTest.java | 16 +--- .../SnowflakeStreamingIngestChannelTest.java | 73 ++++++++++++++++--- .../SnowflakeStreamingIngestClientTest.java | 27 ++++--- .../datatypes/AbstractDataTypeTest.java | 2 +- 17 files changed, 172 insertions(+), 164 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 7b0adaabc..fa502f30a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -25,6 +25,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; /** * The abstract implementation of the buffer in the Streaming Ingest channel that holds the @@ -668,6 +669,7 @@ static AbstractRowBuffer createRowBuffer( ChannelRuntimeState channelRuntimeState, ClientBufferParameters clientBufferParameters, OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion, TelemetryService telemetryService) { switch (bdecVersion) { case THREE: @@ -681,6 +683,7 @@ static AbstractRowBuffer createRowBuffer( channelRuntimeState, clientBufferParameters, offsetTokenVerificationFunction, + parquetWriterVersion, telemetryService); default: throw new SFException( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java index 1de924cc5..fe9542267 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java @@ -5,7 +5,6 @@ package net.snowflake.ingest.streaming.internal; import net.snowflake.ingest.utils.Utils; -import org.apache.parquet.column.ParquetProperties; /** * Channel immutable identification and encryption attributes. @@ -30,8 +29,6 @@ class ChannelFlushContext { // Data encryption key id private final Long encryptionKeyId; - private final ParquetProperties.WriterVersion parquetWriterVersion; - ChannelFlushContext( String name, String dbName, @@ -39,8 +36,7 @@ class ChannelFlushContext { String tableName, Long channelSequencer, String encryptionKey, - Long encryptionKeyId, - ParquetProperties.WriterVersion parquetWriterVersion) { + Long encryptionKeyId) { this.name = name; this.fullyQualifiedName = Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name); @@ -51,7 +47,6 @@ class ChannelFlushContext { this.channelSequencer = channelSequencer; this.encryptionKey = encryptionKey; this.encryptionKeyId = encryptionKeyId; - this.parquetWriterVersion = parquetWriterVersion; } @Override @@ -120,8 +115,4 @@ String getEncryptionKey() { Long getEncryptionKeyId() { return encryptionKeyId; } - - ParquetProperties.WriterVersion getParquetWriterVersion() { - return parquetWriterVersion; - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java index dce6f060f..0058624af 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -import net.snowflake.ingest.utils.Constants; /** Response to the OpenChannelRequest */ class OpenChannelResponse extends StreamingIngestResponse { @@ -148,7 +147,7 @@ void setIcebergSerializationPolicy(String icebergSerializationPolicy) { this.icebergSerializationPolicy = icebergSerializationPolicy; } - Constants.IcebergSerializationPolicy getIcebergSerializationPolicy() { - return Constants.IcebergSerializationPolicy.fromName(this.icebergSerializationPolicy); + String getIcebergSerializationPolicy() { + return this.icebergSerializationPolicy; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 3ee240226..5b11996ec 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -31,6 +31,7 @@ public class ParquetFlusher implements Flusher { private final Optional maxRowGroups; private final Constants.BdecParquetCompression bdecParquetCompression; + private final ParquetProperties.WriterVersion parquetWriterVersion; private final boolean enableDictionaryEncoding; /** Construct parquet flusher from its schema. */ @@ -39,11 +40,13 @@ public ParquetFlusher( long maxChunkSizeInBytes, Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression, + ParquetProperties.WriterVersion parquetWriterVersion, boolean enableDictionaryEncoding) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; + this.parquetWriterVersion = parquetWriterVersion; this.enableDictionaryEncoding = enableDictionaryEncoding; } @@ -66,7 +69,6 @@ private SerializationResult serializeFromJavaObjects( BdecParquetWriter parquetWriter; ByteArrayOutputStream mergedData = new ByteArrayOutputStream(); Pair chunkMinMaxInsertTimeInMs = null; - ParquetProperties.WriterVersion parquetWriterVersion = null; for (ChannelData data : channelsDataPerTable) { // Create channel metadata @@ -108,15 +110,6 @@ private SerializationResult serializeFromJavaObjects( chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); } - // Check if all the channels have the same parquet writer version - if (parquetWriterVersion == null) { - parquetWriterVersion = data.getChannelContext().getParquetWriterVersion(); - } else if (!parquetWriterVersion.equals(data.getChannelContext().getParquetWriterVersion())) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - "Parquet writer version and storage serialization policy mismatch within a chunk"); - } - rows.addAll(data.getVectors().rows); rowCount += data.getRowCount(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 78dba9cce..5e3fa1191 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -26,6 +26,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -45,6 +46,8 @@ public class ParquetRowBuffer extends AbstractRowBuffer { private final List> data; private final List> tempData; + private final ParquetProperties.WriterVersion parquetWriterVersion; + private MessageType schema; /** Construct a ParquetRowBuffer object. */ @@ -56,6 +59,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { ChannelRuntimeState channelRuntimeState, ClientBufferParameters clientBufferParameters, OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion, TelemetryService telemetryService) { super( onErrorOption, @@ -70,6 +74,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { this.metadata = new HashMap<>(); this.data = new ArrayList<>(); this.tempData = new ArrayList<>(); + this.parquetWriterVersion = parquetWriterVersion; } /** @@ -395,6 +400,7 @@ public Flusher createFlusher() { clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getMaxRowGroups(), clientBufferParameters.getBdecParquetCompression(), + parquetWriterVersion, clientBufferParameters.isEnableDictionaryEncoding()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index 9c39adc80..40542c8b0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -131,7 +131,6 @@ SnowflakeStreamingIngestChannelInternal build() { this.encryptionKeyId, this.onErrorOption, this.defaultTimezone, - this.owningClient.getParameterProvider().getBlobFormatVersion(), this.offsetTokenVerificationFunction, this.parquetWriterVersion); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index af12b31ea..167240ed7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -10,7 +10,6 @@ import com.google.common.annotations.VisibleForTesting; import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -19,13 +18,13 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; @@ -69,49 +68,6 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn private final MemoryInfoProvider memoryInfoProvider; private volatile long freeMemoryInBytes = 0; - /** - * Constructor for TESTING ONLY which allows us to set the test mode - * - * @param name - * @param dbName - * @param schemaName - * @param tableName - * @param offsetToken - * @param channelSequencer - * @param rowSequencer - * @param client - */ - SnowflakeStreamingIngestChannelInternal( - String name, - String dbName, - String schemaName, - String tableName, - String offsetToken, - Long channelSequencer, - Long rowSequencer, - SnowflakeStreamingIngestClientInternal client, - String encryptionKey, - Long encryptionKeyId, - OpenChannelRequest.OnErrorOption onErrorOption, - ZoneOffset defaultTimezone) { - this( - name, - dbName, - schemaName, - tableName, - offsetToken, - channelSequencer, - rowSequencer, - client, - encryptionKey, - encryptionKeyId, - onErrorOption, - defaultTimezone, - client.getParameterProvider().getBlobFormatVersion(), - null /* offsetTokenVerificationFunction */, - null /* parquetWriterVersion */); - } - /** Default constructor */ SnowflakeStreamingIngestChannelInternal( String name, @@ -121,12 +77,11 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn String endOffsetToken, Long channelSequencer, Long rowSequencer, - SnowflakeStreamingIngestClientInternal client, + @Nonnull SnowflakeStreamingIngestClientInternal client, String encryptionKey, Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - Constants.BdecVersion bdecVersion, OffsetTokenVerificationFunction offsetTokenVerificationFunction, ParquetProperties.WriterVersion parquetWriterVersion) { this.isClosed = false; @@ -144,28 +99,20 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this.memoryInfoProvider = MemoryInfoProviderFromRuntime.getInstance(); this.channelFlushContext = new ChannelFlushContext( - name, - dbName, - schemaName, - tableName, - channelSequencer, - encryptionKey, - encryptionKeyId, - parquetWriterVersion == null - ? ParquetProperties.DEFAULT_WRITER_VERSION - : parquetWriterVersion); + name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId); this.channelState = new ChannelRuntimeState(endOffsetToken, rowSequencer, true); this.rowBuffer = AbstractRowBuffer.createRowBuffer( onErrorOption, defaultTimezone, - bdecVersion, + client.getParameterProvider().getBlobFormatVersion(), getFullyQualifiedName(), this::collectRowSize, channelState, new ClientBufferParameters(owningClient), offsetTokenVerificationFunction, - owningClient == null ? null : owningClient.getTelemetryService()); + parquetWriterVersion, + owningClient.getTelemetryService()); this.tableColumns = new HashMap<>(); logger.logInfo( "Channel={} created for table={}", diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index ee681775b..7751499f1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -71,6 +71,7 @@ import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * The first version of implementation for SnowflakeStreamingIngestClient. The client internally @@ -355,11 +356,17 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); } - if (isIcebergMode - && response.getTableColumns().stream() - .anyMatch(c -> c.getSourceIcebergDataType() == null)) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set"); + if (isIcebergMode) { + if (response.getTableColumns().stream().anyMatch(c -> c.getSourceIcebergDataType() == null)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set"); + } + + if (response.getIcebergSerializationPolicy() == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Iceberg Table's open channel response does not have serialization policy set."); + } } logger.logInfo( @@ -387,7 +394,11 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setDefaultTimezone(request.getDefaultTimezone()) .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) .setParquetWriterVersion( - response.getIcebergSerializationPolicy().getParquetWriterVersion()) + isIcebergMode + ? Constants.IcebergSerializationPolicy.valueOf( + response.getIcebergSerializationPolicy()) + .toParquetWriterVersion() + : ParquetProperties.WriterVersion.PARQUET_1_0) .build(); // Setup the row buffer schema diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 148339ea4..7198c7669 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -79,28 +79,11 @@ public class Constants { * otherwise v1. */ public enum IcebergSerializationPolicy { - NON_ICEBERG, COMPATIBLE, OPTIMIZED; - public static IcebergSerializationPolicy fromName(String name) { - if (name == null) { - return NON_ICEBERG; - } - for (IcebergSerializationPolicy e : IcebergSerializationPolicy.values()) { - if (e.name().equalsIgnoreCase(name)) { - return e; - } - } - throw new IllegalArgumentException( - String.format( - "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", - name, Arrays.asList(IcebergSerializationPolicy.values()))); - } - - public ParquetProperties.WriterVersion getParquetWriterVersion() { + public ParquetProperties.WriterVersion toParquetWriterVersion() { switch (this) { - case NON_ICEBERG: case COMPATIBLE: return ParquetProperties.WriterVersion.PARQUET_1_0; case OPTIMIZED: diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index cf254c193..04a740272 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -78,6 +78,9 @@ private List> createChannelDataPerTable(int metada 100L, isIceberg ? Optional.of(1) : Optional.empty(), Constants.BdecParquetCompression.GZIP, + isIceberg + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0, isIceberg)) .when(channelData) .createFlusher(); @@ -121,17 +124,7 @@ private List> createChannelDataPerTable(int metada .named("test")) : new RowBufferStats(columnName, null, 1, null, null)); channelData.setChannelContext( - new ChannelFlushContext( - "channel1", - "DB", - "SCHEMA", - "TABLE", - 1L, - "enc", - 1L, - isIceberg - ? ParquetProperties.WriterVersion.PARQUET_2_0 - : ParquetProperties.WriterVersion.PARQUET_1_0)); + new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L)); return Collections.singletonList(channelData); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index ef3684774..d9a207ee8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -13,6 +13,7 @@ import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.OpenChannelRequest; +import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -61,7 +62,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -75,7 +80,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -89,7 +98,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channel1); cache.addChannel(channel2); cache.addChannel(channel3); @@ -115,7 +128,11 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channel); Assert.assertEquals(1, cache.getSize()); Assert.assertTrue(channel == cache.entrySet().iterator().next().getValue().get(channelName)); @@ -133,7 +150,11 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channelDup); // The old channel should be invalid now Assert.assertTrue(!channel.isValid()); @@ -214,7 +235,11 @@ public void testRemoveChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.removeChannelIfSequencersMatch(channel3Dup); // Verify that remove the same channel with a different channel sequencer is a no op Assert.assertEquals(1, cache.getSize()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 9043d2ff5..a7e4ba35b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -303,7 +303,6 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( encryptionKeyId, onErrorOption, defaultTimezone, - Constants.BdecVersion.THREE, null, isIcebergMode ? ParquetProperties.WriterVersion.PARQUET_2_0 @@ -1065,7 +1064,11 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC); + ZoneOffset.UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( @@ -1080,7 +1083,11 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC); + ZoneOffset.UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channelCache.addChannel(channel1); channelCache.addChannel(channel2); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index ebe6742a2..b2b81adad 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -15,6 +15,7 @@ import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -78,7 +79,11 @@ public void setUpBeforeAll() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); // Setup column fields and vectors ColumnMetadata col = new ColumnMetadata(); col.setOrdinal(1); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 5ce58371e..e1cb764dd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -156,6 +156,9 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o isIcebergMode ? Optional.of(1) : Optional.empty(), isIcebergMode), null, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0, null); } @@ -1925,18 +1928,7 @@ public void testParquetFileNameMetadata() throws IOException { bufferUnderTest.setupSchema(Collections.singletonList(colChar)); loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); ChannelData data = bufferUnderTest.flush(); - data.setChannelContext( - new ChannelFlushContext( - "name", - "db", - "schema", - "table", - 1L, - "key", - 0L, - isIcebergMode - ? ParquetProperties.WriterVersion.PARQUET_2_0 - : ParquetProperties.WriterVersion.PARQUET_1_0)); + data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L)); ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); Flusher.SerializationResult result = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index b3e2d23d6..3c0bdfc2b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -48,6 +48,7 @@ import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.column.ParquetProperties; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -199,7 +200,11 @@ public void testChannelValid() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); Assert.assertTrue(channel.isValid()); channel.invalidate("from testChannelValid", "Invalidated by test"); @@ -247,7 +252,11 @@ public void testChannelClose() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); Assert.assertFalse(channel.isClosed()); channel.markClosed(); @@ -545,7 +554,11 @@ public void testInsertRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ColumnMetadata col = new ColumnMetadata(); col.setOrdinal(1); @@ -631,7 +644,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); InsertValidationResponse insertValidationResponse = channel.insertRow(row, "token-1"); @@ -655,7 +672,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.ABORT, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); try { @@ -680,7 +701,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.SKIP_BATCH, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); insertValidationResponse = channel.insertRow(row, "token-1"); @@ -711,7 +736,11 @@ public void testInsertRowThrottling() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); memoryInfoProvider.freeMemory = @@ -755,7 +784,11 @@ public void testFlush() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -789,7 +822,11 @@ public void testClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -821,7 +858,11 @@ public void testDropOnClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -856,7 +897,11 @@ public void testDropOnCloseInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -887,7 +932,11 @@ public void testGetLatestCommittedOffsetToken() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index c468f03ea..627593e82 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -61,6 +61,7 @@ import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.bouncycastle.asn1.nist.NISTObjectIdentifiers; @@ -131,7 +132,6 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); channel2 = @@ -148,7 +148,6 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); channel3 = @@ -165,7 +164,6 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); channel4 = @@ -182,7 +180,6 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); } @@ -380,7 +377,6 @@ public void testGetChannelsStatusWithRequest() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); @@ -443,7 +439,6 @@ public void testGetChannelsStatusWithRequestError() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); @@ -497,7 +492,6 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); @@ -1073,7 +1067,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( channel2Name, @@ -1087,7 +1085,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); client.getChannelCache().addChannel(channel1); client.getChannelCache().addChannel(channel2); @@ -1193,7 +1195,11 @@ public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); client.getChannelCache().addChannel(channel); ChannelsStatusResponse response = new ChannelsStatusResponse(); @@ -1264,7 +1270,6 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index a1dfc6e2c..545a4827b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -76,7 +76,7 @@ public void before() throws Exception { setUp( false /* isIceberg */, compressionAlgorithm, - Constants.IcebergSerializationPolicy.NON_ICEBERG); + Constants.IcebergSerializationPolicy.COMPATIBLE); } public void beforeIceberg(