From 7356bee2ff478e2b722550b04351dbcf86755a9a Mon Sep 17 00:00:00 2001 From: Hitesh Madan Date: Fri, 11 Oct 2024 22:28:35 -0700 Subject: [PATCH] (Stacked PR) Code review changes for Parquet V2 (#862) 1. Remove logic from OpenChannelResponse contract class 2. Move writerVersion defaulting to channel construction callsite in clientInternal (from channel ctor), instead of passing writerVersion=null into the channel. 3. Pass around writerVersion via RowBuffer into Flusher, instead of via the per-chunk flushContext. 4. Remove a test-only overload of ChannelInternal 5. Remove an unnecessary parameter on ChannelInternal ctor (bdecVersion) 6. Remove SerializationPolicy.NON_ICEBERG, remove the custom SerPolicy.fromName method and use Enum.valueOf that java already has --- .../streaming/internal/AbstractRowBuffer.java | 3 + .../internal/ChannelFlushContext.java | 11 +-- .../internal/OpenChannelResponse.java | 5 +- .../streaming/internal/ParquetFlusher.java | 13 +--- .../streaming/internal/ParquetRowBuffer.java | 6 ++ ...nowflakeStreamingIngestChannelFactory.java | 1 - ...owflakeStreamingIngestChannelInternal.java | 65 ++--------------- ...nowflakeStreamingIngestClientInternal.java | 23 ++++-- .../net/snowflake/ingest/utils/Constants.java | 19 +---- .../streaming/internal/BlobBuilderTest.java | 15 +--- .../streaming/internal/ChannelCacheTest.java | 37 ++++++++-- .../streaming/internal/FlushServiceTest.java | 13 +++- .../internal/InsertRowsBenchmarkTest.java | 7 +- .../streaming/internal/RowBufferTest.java | 16 +--- .../SnowflakeStreamingIngestChannelTest.java | 73 ++++++++++++++++--- .../SnowflakeStreamingIngestClientTest.java | 27 ++++--- .../datatypes/AbstractDataTypeTest.java | 2 +- 17 files changed, 172 insertions(+), 164 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 7b0adaabc..fa502f30a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -25,6 +25,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import org.apache.parquet.column.ParquetProperties; /** * The abstract implementation of the buffer in the Streaming Ingest channel that holds the @@ -668,6 +669,7 @@ static AbstractRowBuffer createRowBuffer( ChannelRuntimeState channelRuntimeState, ClientBufferParameters clientBufferParameters, OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion, TelemetryService telemetryService) { switch (bdecVersion) { case THREE: @@ -681,6 +683,7 @@ static AbstractRowBuffer createRowBuffer( channelRuntimeState, clientBufferParameters, offsetTokenVerificationFunction, + parquetWriterVersion, telemetryService); default: throw new SFException( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java index 1de924cc5..fe9542267 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java @@ -5,7 +5,6 @@ package net.snowflake.ingest.streaming.internal; import net.snowflake.ingest.utils.Utils; -import org.apache.parquet.column.ParquetProperties; /** * Channel immutable identification and encryption attributes. @@ -30,8 +29,6 @@ class ChannelFlushContext { // Data encryption key id private final Long encryptionKeyId; - private final ParquetProperties.WriterVersion parquetWriterVersion; - ChannelFlushContext( String name, String dbName, @@ -39,8 +36,7 @@ class ChannelFlushContext { String tableName, Long channelSequencer, String encryptionKey, - Long encryptionKeyId, - ParquetProperties.WriterVersion parquetWriterVersion) { + Long encryptionKeyId) { this.name = name; this.fullyQualifiedName = Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name); @@ -51,7 +47,6 @@ class ChannelFlushContext { this.channelSequencer = channelSequencer; this.encryptionKey = encryptionKey; this.encryptionKeyId = encryptionKeyId; - this.parquetWriterVersion = parquetWriterVersion; } @Override @@ -120,8 +115,4 @@ String getEncryptionKey() { Long getEncryptionKeyId() { return encryptionKeyId; } - - ParquetProperties.WriterVersion getParquetWriterVersion() { - return parquetWriterVersion; - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java index dce6f060f..0058624af 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -import net.snowflake.ingest.utils.Constants; /** Response to the OpenChannelRequest */ class OpenChannelResponse extends StreamingIngestResponse { @@ -148,7 +147,7 @@ void setIcebergSerializationPolicy(String icebergSerializationPolicy) { this.icebergSerializationPolicy = icebergSerializationPolicy; } - Constants.IcebergSerializationPolicy getIcebergSerializationPolicy() { - return Constants.IcebergSerializationPolicy.fromName(this.icebergSerializationPolicy); + String getIcebergSerializationPolicy() { + return this.icebergSerializationPolicy; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 3ee240226..5b11996ec 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -31,6 +31,7 @@ public class ParquetFlusher implements Flusher { private final Optional maxRowGroups; private final Constants.BdecParquetCompression bdecParquetCompression; + private final ParquetProperties.WriterVersion parquetWriterVersion; private final boolean enableDictionaryEncoding; /** Construct parquet flusher from its schema. */ @@ -39,11 +40,13 @@ public ParquetFlusher( long maxChunkSizeInBytes, Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression, + ParquetProperties.WriterVersion parquetWriterVersion, boolean enableDictionaryEncoding) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; + this.parquetWriterVersion = parquetWriterVersion; this.enableDictionaryEncoding = enableDictionaryEncoding; } @@ -66,7 +69,6 @@ private SerializationResult serializeFromJavaObjects( BdecParquetWriter parquetWriter; ByteArrayOutputStream mergedData = new ByteArrayOutputStream(); Pair chunkMinMaxInsertTimeInMs = null; - ParquetProperties.WriterVersion parquetWriterVersion = null; for (ChannelData data : channelsDataPerTable) { // Create channel metadata @@ -108,15 +110,6 @@ private SerializationResult serializeFromJavaObjects( chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); } - // Check if all the channels have the same parquet writer version - if (parquetWriterVersion == null) { - parquetWriterVersion = data.getChannelContext().getParquetWriterVersion(); - } else if (!parquetWriterVersion.equals(data.getChannelContext().getParquetWriterVersion())) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - "Parquet writer version and storage serialization policy mismatch within a chunk"); - } - rows.addAll(data.getVectors().rows); rowCount += data.getRowCount(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 78dba9cce..5e3fa1191 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -26,6 +26,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -45,6 +46,8 @@ public class ParquetRowBuffer extends AbstractRowBuffer { private final List> data; private final List> tempData; + private final ParquetProperties.WriterVersion parquetWriterVersion; + private MessageType schema; /** Construct a ParquetRowBuffer object. */ @@ -56,6 +59,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { ChannelRuntimeState channelRuntimeState, ClientBufferParameters clientBufferParameters, OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion, TelemetryService telemetryService) { super( onErrorOption, @@ -70,6 +74,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { this.metadata = new HashMap<>(); this.data = new ArrayList<>(); this.tempData = new ArrayList<>(); + this.parquetWriterVersion = parquetWriterVersion; } /** @@ -395,6 +400,7 @@ public Flusher createFlusher() { clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getMaxRowGroups(), clientBufferParameters.getBdecParquetCompression(), + parquetWriterVersion, clientBufferParameters.isEnableDictionaryEncoding()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index 9c39adc80..40542c8b0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -131,7 +131,6 @@ SnowflakeStreamingIngestChannelInternal build() { this.encryptionKeyId, this.onErrorOption, this.defaultTimezone, - this.owningClient.getParameterProvider().getBlobFormatVersion(), this.offsetTokenVerificationFunction, this.parquetWriterVersion); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index af12b31ea..167240ed7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -10,7 +10,6 @@ import com.google.common.annotations.VisibleForTesting; import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -19,13 +18,13 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import net.snowflake.ingest.streaming.DropChannelRequest; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; @@ -69,49 +68,6 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn private final MemoryInfoProvider memoryInfoProvider; private volatile long freeMemoryInBytes = 0; - /** - * Constructor for TESTING ONLY which allows us to set the test mode - * - * @param name - * @param dbName - * @param schemaName - * @param tableName - * @param offsetToken - * @param channelSequencer - * @param rowSequencer - * @param client - */ - SnowflakeStreamingIngestChannelInternal( - String name, - String dbName, - String schemaName, - String tableName, - String offsetToken, - Long channelSequencer, - Long rowSequencer, - SnowflakeStreamingIngestClientInternal client, - String encryptionKey, - Long encryptionKeyId, - OpenChannelRequest.OnErrorOption onErrorOption, - ZoneOffset defaultTimezone) { - this( - name, - dbName, - schemaName, - tableName, - offsetToken, - channelSequencer, - rowSequencer, - client, - encryptionKey, - encryptionKeyId, - onErrorOption, - defaultTimezone, - client.getParameterProvider().getBlobFormatVersion(), - null /* offsetTokenVerificationFunction */, - null /* parquetWriterVersion */); - } - /** Default constructor */ SnowflakeStreamingIngestChannelInternal( String name, @@ -121,12 +77,11 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn String endOffsetToken, Long channelSequencer, Long rowSequencer, - SnowflakeStreamingIngestClientInternal client, + @Nonnull SnowflakeStreamingIngestClientInternal client, String encryptionKey, Long encryptionKeyId, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, - Constants.BdecVersion bdecVersion, OffsetTokenVerificationFunction offsetTokenVerificationFunction, ParquetProperties.WriterVersion parquetWriterVersion) { this.isClosed = false; @@ -144,28 +99,20 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this.memoryInfoProvider = MemoryInfoProviderFromRuntime.getInstance(); this.channelFlushContext = new ChannelFlushContext( - name, - dbName, - schemaName, - tableName, - channelSequencer, - encryptionKey, - encryptionKeyId, - parquetWriterVersion == null - ? ParquetProperties.DEFAULT_WRITER_VERSION - : parquetWriterVersion); + name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId); this.channelState = new ChannelRuntimeState(endOffsetToken, rowSequencer, true); this.rowBuffer = AbstractRowBuffer.createRowBuffer( onErrorOption, defaultTimezone, - bdecVersion, + client.getParameterProvider().getBlobFormatVersion(), getFullyQualifiedName(), this::collectRowSize, channelState, new ClientBufferParameters(owningClient), offsetTokenVerificationFunction, - owningClient == null ? null : owningClient.getTelemetryService()); + parquetWriterVersion, + owningClient.getTelemetryService()); this.tableColumns = new HashMap<>(); logger.logInfo( "Channel={} created for table={}", diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index ee681775b..7751499f1 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -71,6 +71,7 @@ import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * The first version of implementation for SnowflakeStreamingIngestClient. The client internally @@ -355,11 +356,17 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); } - if (isIcebergMode - && response.getTableColumns().stream() - .anyMatch(c -> c.getSourceIcebergDataType() == null)) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set"); + if (isIcebergMode) { + if (response.getTableColumns().stream().anyMatch(c -> c.getSourceIcebergDataType() == null)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set"); + } + + if (response.getIcebergSerializationPolicy() == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Iceberg Table's open channel response does not have serialization policy set."); + } } logger.logInfo( @@ -387,7 +394,11 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setDefaultTimezone(request.getDefaultTimezone()) .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) .setParquetWriterVersion( - response.getIcebergSerializationPolicy().getParquetWriterVersion()) + isIcebergMode + ? Constants.IcebergSerializationPolicy.valueOf( + response.getIcebergSerializationPolicy()) + .toParquetWriterVersion() + : ParquetProperties.WriterVersion.PARQUET_1_0) .build(); // Setup the row buffer schema diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 148339ea4..7198c7669 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -79,28 +79,11 @@ public class Constants { * otherwise v1. */ public enum IcebergSerializationPolicy { - NON_ICEBERG, COMPATIBLE, OPTIMIZED; - public static IcebergSerializationPolicy fromName(String name) { - if (name == null) { - return NON_ICEBERG; - } - for (IcebergSerializationPolicy e : IcebergSerializationPolicy.values()) { - if (e.name().equalsIgnoreCase(name)) { - return e; - } - } - throw new IllegalArgumentException( - String.format( - "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", - name, Arrays.asList(IcebergSerializationPolicy.values()))); - } - - public ParquetProperties.WriterVersion getParquetWriterVersion() { + public ParquetProperties.WriterVersion toParquetWriterVersion() { switch (this) { - case NON_ICEBERG: case COMPATIBLE: return ParquetProperties.WriterVersion.PARQUET_1_0; case OPTIMIZED: diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index cf254c193..04a740272 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -78,6 +78,9 @@ private List> createChannelDataPerTable(int metada 100L, isIceberg ? Optional.of(1) : Optional.empty(), Constants.BdecParquetCompression.GZIP, + isIceberg + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0, isIceberg)) .when(channelData) .createFlusher(); @@ -121,17 +124,7 @@ private List> createChannelDataPerTable(int metada .named("test")) : new RowBufferStats(columnName, null, 1, null, null)); channelData.setChannelContext( - new ChannelFlushContext( - "channel1", - "DB", - "SCHEMA", - "TABLE", - 1L, - "enc", - 1L, - isIceberg - ? ParquetProperties.WriterVersion.PARQUET_2_0 - : ParquetProperties.WriterVersion.PARQUET_1_0)); + new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L)); return Collections.singletonList(channelData); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index ef3684774..d9a207ee8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -13,6 +13,7 @@ import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.OpenChannelRequest; +import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -61,7 +62,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel2 = new SnowflakeStreamingIngestChannelInternal<>( "channel2", @@ -75,7 +80,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel3 = new SnowflakeStreamingIngestChannelInternal<>( "channel3", @@ -89,7 +98,11 @@ public void setup() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channel1); cache.addChannel(channel2); cache.addChannel(channel3); @@ -115,7 +128,11 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channel); Assert.assertEquals(1, cache.getSize()); Assert.assertTrue(channel == cache.entrySet().iterator().next().getValue().get(channelName)); @@ -133,7 +150,11 @@ public void testAddChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.addChannel(channelDup); // The old channel should be invalid now Assert.assertTrue(!channel.isValid()); @@ -214,7 +235,11 @@ public void testRemoveChannel() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); cache.removeChannelIfSequencersMatch(channel3Dup); // Verify that remove the same channel with a different channel sequencer is a no op Assert.assertEquals(1, cache.getSize()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 9043d2ff5..a7e4ba35b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -303,7 +303,6 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( encryptionKeyId, onErrorOption, defaultTimezone, - Constants.BdecVersion.THREE, null, isIcebergMode ? ParquetProperties.WriterVersion.PARQUET_2_0 @@ -1065,7 +1064,11 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC); + ZoneOffset.UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( @@ -1080,7 +1083,11 @@ public void testInvalidateChannels() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - ZoneOffset.UTC); + ZoneOffset.UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channelCache.addChannel(channel1); channelCache.addChannel(channel2); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index ebe6742a2..b2b81adad 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -15,6 +15,7 @@ import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -78,7 +79,11 @@ public void setUpBeforeAll() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); // Setup column fields and vectors ColumnMetadata col = new ColumnMetadata(); col.setOrdinal(1); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 5ce58371e..e1cb764dd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -156,6 +156,9 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o isIcebergMode ? Optional.of(1) : Optional.empty(), isIcebergMode), null, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0, null); } @@ -1925,18 +1928,7 @@ public void testParquetFileNameMetadata() throws IOException { bufferUnderTest.setupSchema(Collections.singletonList(colChar)); loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); ChannelData data = bufferUnderTest.flush(); - data.setChannelContext( - new ChannelFlushContext( - "name", - "db", - "schema", - "table", - 1L, - "key", - 0L, - isIcebergMode - ? ParquetProperties.WriterVersion.PARQUET_2_0 - : ParquetProperties.WriterVersion.PARQUET_1_0)); + data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L)); ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); Flusher.SerializationResult result = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index b3e2d23d6..3c0bdfc2b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -48,6 +48,7 @@ import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.column.ParquetProperties; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -199,7 +200,11 @@ public void testChannelValid() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); Assert.assertTrue(channel.isValid()); channel.invalidate("from testChannelValid", "Invalidated by test"); @@ -247,7 +252,11 @@ public void testChannelClose() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); Assert.assertFalse(channel.isClosed()); channel.markClosed(); @@ -545,7 +554,11 @@ public void testInsertRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ColumnMetadata col = new ColumnMetadata(); col.setOrdinal(1); @@ -631,7 +644,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); InsertValidationResponse insertValidationResponse = channel.insertRow(row, "token-1"); @@ -655,7 +672,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.ABORT, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); try { @@ -680,7 +701,11 @@ public void testInsertTooLargeRow() { "key", 1234L, OpenChannelRequest.OnErrorOption.SKIP_BATCH, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); channel.setupSchema(schema); insertValidationResponse = channel.insertRow(row, "token-1"); @@ -711,7 +736,11 @@ public void testInsertRowThrottling() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); memoryInfoProvider.freeMemory = @@ -755,7 +784,11 @@ public void testFlush() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -789,7 +822,11 @@ public void testClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -821,7 +858,11 @@ public void testDropOnClose() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -856,7 +897,11 @@ public void testDropOnCloseInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -887,7 +932,11 @@ public void testGetLatestCommittedOffsetToken() { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index c468f03ea..627593e82 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -61,6 +61,7 @@ import net.snowflake.ingest.utils.SnowflakeURL; import net.snowflake.ingest.utils.Utils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.bouncycastle.asn1.nist.NISTObjectIdentifiers; @@ -131,7 +132,6 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); channel2 = @@ -148,7 +148,6 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); channel3 = @@ -165,7 +164,6 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); channel4 = @@ -182,7 +180,6 @@ public void setup() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); } @@ -380,7 +377,6 @@ public void testGetChannelsStatusWithRequest() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); @@ -443,7 +439,6 @@ public void testGetChannelsStatusWithRequestError() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); @@ -497,7 +492,6 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); @@ -1073,7 +1067,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); SnowflakeStreamingIngestChannelInternal channel2 = new SnowflakeStreamingIngestChannelInternal<>( channel2Name, @@ -1087,7 +1085,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); client.getChannelCache().addChannel(channel1); client.getChannelCache().addChannel(channel2); @@ -1193,7 +1195,11 @@ public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { "key", 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, - UTC); + UTC, + null /* offsetTokenVerificationFunction */, + isIcebergMode + ? ParquetProperties.WriterVersion.PARQUET_2_0 + : ParquetProperties.WriterVersion.PARQUET_1_0); client.getChannelCache().addChannel(channel); ChannelsStatusResponse response = new ChannelsStatusResponse(); @@ -1264,7 +1270,6 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { 1234L, OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, - BDEC_VERSION, null, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index a1dfc6e2c..545a4827b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -76,7 +76,7 @@ public void before() throws Exception { setUp( false /* isIceberg */, compressionAlgorithm, - Constants.IcebergSerializationPolicy.NON_ICEBERG); + Constants.IcebergSerializationPolicy.COMPATIBLE); } public void beforeIceberg(