From bb791d5020b6fae4aa51f06c2de3d9da49fda7dc Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Fri, 4 Oct 2024 01:21:53 +0000 Subject: [PATCH] Address comments w/o NDV --- .../streaming/internal/AbstractRowBuffer.java | 8 +- .../streaming/internal/BlobBuilder.java | 17 +---- .../internal/ChannelFlushContext.java | 11 ++- .../internal/ClientBufferParameters.java | 29 +------ .../ingest/streaming/internal/EpInfo.java | 9 ++- .../internal/InternalParameterProvider.java | 5 -- .../internal/OpenChannelResponse.java | 11 +++ .../streaming/internal/ParquetFlusher.java | 13 +++- .../streaming/internal/ParquetRowBuffer.java | 9 +-- .../streaming/internal/RowBufferStats.java | 4 +- ...nowflakeStreamingIngestChannelFactory.java | 11 ++- ...owflakeStreamingIngestChannelInternal.java | 18 ++++- ...nowflakeStreamingIngestClientInternal.java | 2 + .../net/snowflake/ingest/utils/Constants.java | 43 ++++++++++- .../ingest/utils/ParameterProvider.java | 38 ---------- .../net/snowflake/ingest/utils/Utils.java | 27 ++++--- .../parquet/hadoop/BdecParquetWriter.java | 3 +- .../streaming/internal/BlobBuilderTest.java | 11 ++- .../streaming/internal/ChannelDataTest.java | 27 ++++--- .../streaming/internal/FlushServiceTest.java | 7 +- .../IcebergParquetValueParserTest.java | 45 +++++------ .../internal/RowBufferStatsTest.java | 76 +++++++++++-------- .../streaming/internal/RowBufferTest.java | 49 +++++++----- .../SnowflakeParquetValueParserTest.java | 40 +++++----- .../SnowflakeStreamingIngestClientTest.java | 16 +++- 25 files changed, 298 insertions(+), 231 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 0fa0f39c4..ba15b0f5a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -641,21 +641,19 @@ public synchronized void close(String name) { * * @param rowCount: count of rows in the given buffer * @param colStats: map of column name to RowBufferStats - * @param setDefaultValues: whether to set default values for null fields the EPs + * @param setDefaultValues: whether to set default values for null fields and NDV in the EPs * @return the EPs built from column stats */ static EpInfo buildEpInfoFromStats( long rowCount, Map colStats, boolean setDefaultValues) { - EpInfo epInfo = new EpInfo(rowCount, new HashMap<>()); - boolean enableDistinctValues = false; + EpInfo epInfo = new EpInfo(rowCount, new HashMap<>(), !setDefaultValues); for (Map.Entry colStat : colStats.entrySet()) { RowBufferStats stat = colStat.getValue(); - enableDistinctValues = stat.isEnableDistinctValue(); FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues); String colName = colStat.getValue().getColumnDisplayName(); epInfo.getColumnEps().put(colName, dto); } - epInfo.verifyEpInfo(enableDistinctValues); + epInfo.verifyEpInfo(); return epInfo; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 8a6599407..a87867e6a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -33,6 +33,7 @@ import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.Utils; import org.apache.commons.codec.binary.Hex; +import org.apache.parquet.hadoop.ParquetFileWriter; /** * Build a single blob file that contains file header plus data. The header will be a @@ -98,13 +99,6 @@ static Blob constructBlobAndMetadata( byte[] paddedChunkData = paddedChunk.getFirst(); chunkLength = paddedChunk.getSecond(); - if (internalParameterProvider.getComputeExtendedMetadataSize()) { - extendedMetadataSize = - Utils.getLittleEndianInt( - paddedChunkData, - chunkLength - Constants.PARQUET_MAGIC_BYTES_LENGTH - Integer.BYTES); - } - // Encrypt the compressed chunk data, the encryption key is derived using the key from // server with the full blob path. // We need to maintain IV as a block counter for the whole file, even interleaved, @@ -120,11 +114,8 @@ static Blob constructBlobAndMetadata( chunkLength = compressedChunkData.length; compressedChunkDataSize = chunkLength; - if (internalParameterProvider.getComputeExtendedMetadataSize()) { - extendedMetadataSize = - Utils.getLittleEndianInt( - compressedChunkData, - chunkLength - Constants.PARQUET_MAGIC_BYTES_LENGTH - Integer.BYTES); + if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { + extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength); } } @@ -156,7 +147,7 @@ static Blob constructBlobAndMetadata( if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { chunkMetadataBuilder - .setMajorVersion(Constants.PARQUET_MAJOR_VERSION) + .setMajorVersion(ParquetFileWriter.CURRENT_VERSION) .setMinorVersion(Constants.PARQUET_MINOR_VERSION) // set createdOn in seconds .setCreatedOn(System.currentTimeMillis() / 1000) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java index fe9542267..1de924cc5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * Channel immutable identification and encryption attributes. @@ -29,6 +30,8 @@ class ChannelFlushContext { // Data encryption key id private final Long encryptionKeyId; + private final ParquetProperties.WriterVersion parquetWriterVersion; + ChannelFlushContext( String name, String dbName, @@ -36,7 +39,8 @@ class ChannelFlushContext { String tableName, Long channelSequencer, String encryptionKey, - Long encryptionKeyId) { + Long encryptionKeyId, + ParquetProperties.WriterVersion parquetWriterVersion) { this.name = name; this.fullyQualifiedName = Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name); @@ -47,6 +51,7 @@ class ChannelFlushContext { this.channelSequencer = channelSequencer; this.encryptionKey = encryptionKey; this.encryptionKeyId = encryptionKeyId; + this.parquetWriterVersion = parquetWriterVersion; } @Override @@ -115,4 +120,8 @@ String getEncryptionKey() { Long getEncryptionKeyId() { return encryptionKeyId; } + + ParquetProperties.WriterVersion getParquetWriterVersion() { + return parquetWriterVersion; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 110e6383e..4ca64962c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -7,7 +7,6 @@ import java.util.Optional; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; -import org.apache.parquet.column.ParquetProperties; /** Channel's buffer relevant parameters that are set at the owning client level. */ public class ClientBufferParameters { @@ -24,10 +23,6 @@ public class ClientBufferParameters { private final Optional maxRowGroups; - private ParquetProperties.WriterVersion parquetWriterVersion; - - private boolean enableDictionaryEncoding; - private boolean isIcebergMode; /** @@ -43,16 +38,12 @@ private ClientBufferParameters( Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic, Optional maxRowGroups, - ParquetProperties.WriterVersion parquetWriterVersion, - boolean enableDictionaryEncoding, boolean isIcebergMode) { this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; this.maxRowGroups = maxRowGroups; - this.parquetWriterVersion = parquetWriterVersion; - this.enableDictionaryEncoding = enableDictionaryEncoding; this.isIcebergMode = isIcebergMode; } @@ -74,14 +65,6 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic() : ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; - this.parquetWriterVersion = - clientInternal != null - ? clientInternal.getParameterProvider().getParquetWriterVersion() - : ParameterProvider.PARQUET_WRITER_VERSION_DEFAULT; - this.enableDictionaryEncoding = - clientInternal != null - ? clientInternal.getParameterProvider().isEnableParquetDictionaryEncoding() - : ParameterProvider.ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT; this.maxRowGroups = isIcebergMode ? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT) @@ -104,8 +87,6 @@ public static ClientBufferParameters test_createClientBufferParameters( Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic, Optional maxRowGroups, - ParquetProperties.WriterVersion parquetWriterVersion, - boolean enableDictionaryEncoding, boolean isIcebergMode) { return new ClientBufferParameters( maxChunkSizeInBytes, @@ -113,8 +94,6 @@ public static ClientBufferParameters test_createClientBufferParameters( bdecParquetCompression, enableNewJsonParsingLogic, maxRowGroups, - parquetWriterVersion, - enableDictionaryEncoding, isIcebergMode); } @@ -146,11 +125,7 @@ public String getParquetMessageTypeName() { return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME; } - public ParquetProperties.WriterVersion getParquetWriterVersion() { - return parquetWriterVersion; - } - - public boolean isEnableDictionaryEncoding() { - return enableDictionaryEncoding; + public boolean isEnableDistinctValues() { + return isIcebergMode; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java b/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java index 3a0513e4c..ed3f7fcce 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java @@ -13,16 +13,19 @@ class EpInfo { private Map columnEps; + private boolean enableDistinctValues; + /** Default constructor, needed for Jackson */ EpInfo() {} - EpInfo(long rowCount, Map columnEps) { + EpInfo(long rowCount, Map columnEps, boolean enableDistinctValues) { this.rowCount = rowCount; this.columnEps = columnEps; + this.enableDistinctValues = enableDistinctValues; } /** Some basic verification logic to make sure the EP info is correct */ - public void verifyEpInfo(boolean enableDistinctValues) { + public void verifyEpInfo() { for (Map.Entry entry : columnEps.entrySet()) { String colName = entry.getKey(); FileColumnProperties colEp = entry.getValue(); @@ -35,7 +38,7 @@ public void verifyEpInfo(boolean enableDistinctValues) { colName, colEp.getNullCount(), rowCount)); } - // Make sure the NDV should always be -1 when the NDV is not enabled + // Make sure the NDV should always be -1 when the NDV set to default if (!enableDistinctValues && colEp.getDistinctValues() != EP_NDV_UNKNOWN) { throw new SFException( ErrorCode.INTERNAL_ERROR, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java index bf6e07036..11a2858f6 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -31,9 +31,4 @@ boolean setIcebergSpecificFieldsInEp() { // in the EP metadata, createdOn, and extendedMetadataSize. return isIcebergMode; } - - boolean getComputeExtendedMetadataSize() { - // When in Iceberg mode, extendedMetadataSize is computed. Otherwise, it is -1. - return isIcebergMode; - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java index 92f7ea8c5..dce6f060f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import net.snowflake.ingest.utils.Constants; /** Response to the OpenChannelRequest */ class OpenChannelResponse extends StreamingIngestResponse { @@ -22,6 +23,7 @@ class OpenChannelResponse extends StreamingIngestResponse { private String encryptionKey; private Long encryptionKeyId; private FileLocationInfo icebergLocationInfo; + private String icebergSerializationPolicy; @JsonProperty("status_code") void setStatusCode(Long statusCode) { @@ -140,4 +142,13 @@ void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) { FileLocationInfo getIcebergLocationInfo() { return this.icebergLocationInfo; } + + @JsonProperty("iceberg_serialization_policy") + void setIcebergSerializationPolicy(String icebergSerializationPolicy) { + this.icebergSerializationPolicy = icebergSerializationPolicy; + } + + Constants.IcebergSerializationPolicy getIcebergSerializationPolicy() { + return Constants.IcebergSerializationPolicy.fromName(this.icebergSerializationPolicy); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 5b11996ec..3ee240226 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -31,7 +31,6 @@ public class ParquetFlusher implements Flusher { private final Optional maxRowGroups; private final Constants.BdecParquetCompression bdecParquetCompression; - private final ParquetProperties.WriterVersion parquetWriterVersion; private final boolean enableDictionaryEncoding; /** Construct parquet flusher from its schema. */ @@ -40,13 +39,11 @@ public ParquetFlusher( long maxChunkSizeInBytes, Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression, - ParquetProperties.WriterVersion parquetWriterVersion, boolean enableDictionaryEncoding) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; - this.parquetWriterVersion = parquetWriterVersion; this.enableDictionaryEncoding = enableDictionaryEncoding; } @@ -69,6 +66,7 @@ private SerializationResult serializeFromJavaObjects( BdecParquetWriter parquetWriter; ByteArrayOutputStream mergedData = new ByteArrayOutputStream(); Pair chunkMinMaxInsertTimeInMs = null; + ParquetProperties.WriterVersion parquetWriterVersion = null; for (ChannelData data : channelsDataPerTable) { // Create channel metadata @@ -110,6 +108,15 @@ private SerializationResult serializeFromJavaObjects( chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); } + // Check if all the channels have the same parquet writer version + if (parquetWriterVersion == null) { + parquetWriterVersion = data.getChannelContext().getParquetWriterVersion(); + } else if (!parquetWriterVersion.equals(data.getChannelContext().getParquetWriterVersion())) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Parquet writer version and storage serialization policy mismatch within a chunk"); + } + rows.addAll(data.getVectors().rows); rowCount += data.getRowCount(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index adb8eb9b0..4ae9d9ff7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -106,7 +106,7 @@ public void setupSchema(List columns) { column.getCollation(), column.getOrdinal(), null /* fieldId */, - clientBufferParameters.getIsIcebergMode())); + clientBufferParameters.isEnableDistinctValues())); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { @@ -121,7 +121,7 @@ public void setupSchema(List columns) { column.getCollation(), column.getOrdinal(), null /* fieldId */, - clientBufferParameters.getIsIcebergMode())); + clientBufferParameters.isEnableDistinctValues())); } } @@ -195,7 +195,7 @@ public void setupSchema(List columns) { null /* collationDefinitionString */, ordinal, fieldId, - clientBufferParameters.getIsIcebergMode())); + clientBufferParameters.isEnableDistinctValues())); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { @@ -206,7 +206,7 @@ public void setupSchema(List columns) { null /* collationDefinitionString */, ordinal, fieldId, - clientBufferParameters.getIsIcebergMode())); + clientBufferParameters.isEnableDistinctValues())); } } } @@ -399,7 +399,6 @@ public Flusher createFlusher() { clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getMaxRowGroups(), clientBufferParameters.getBdecParquetCompression(), - clientBufferParameters.getParquetWriterVersion(), clientBufferParameters.getIsIcebergMode()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java index 7e54b8d59..d3f293771 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java @@ -62,8 +62,8 @@ class RowBufferStats { reset(); } - RowBufferStats(String columnDisplayName) { - this(columnDisplayName, null, -1, null, false); + RowBufferStats(String columnDisplayName, boolean enableDistinctValues) { + this(columnDisplayName, null, -1, null, enableDistinctValues); } void reset() { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index a56b82ed5..9c39adc80 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -8,6 +8,7 @@ import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** Builds a Streaming Ingest channel for a specific Streaming Ingest client */ class SnowflakeStreamingIngestChannelFactory { @@ -30,6 +31,7 @@ static class SnowflakeStreamingIngestChannelBuilder { private OpenChannelRequest.OnErrorOption onErrorOption; private ZoneId defaultTimezone; private OffsetTokenVerificationFunction offsetTokenVerificationFunction; + private ParquetProperties.WriterVersion parquetWriterVersion; private SnowflakeStreamingIngestChannelBuilder(String name) { this.name = name; @@ -98,6 +100,12 @@ SnowflakeStreamingIngestChannelBuilder setOffsetTokenVerificationFunction( return this; } + SnowflakeStreamingIngestChannelBuilder setParquetWriterVersion( + ParquetProperties.WriterVersion parquetWriterVersion) { + this.parquetWriterVersion = parquetWriterVersion; + return this; + } + SnowflakeStreamingIngestChannelInternal build() { Utils.assertStringNotNullOrEmpty("channel name", this.name); Utils.assertStringNotNullOrEmpty("table name", this.tableName); @@ -124,7 +132,8 @@ SnowflakeStreamingIngestChannelInternal build() { this.onErrorOption, this.defaultTimezone, this.owningClient.getParameterProvider().getBlobFormatVersion(), - this.offsetTokenVerificationFunction); + this.offsetTokenVerificationFunction, + this.parquetWriterVersion); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 4e884387b..af12b31ea 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -30,6 +30,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * The first version of implementation for SnowflakeStreamingIngestChannel @@ -107,7 +108,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn onErrorOption, defaultTimezone, client.getParameterProvider().getBlobFormatVersion(), - null); + null /* offsetTokenVerificationFunction */, + null /* parquetWriterVersion */); } /** Default constructor */ @@ -125,7 +127,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, Constants.BdecVersion bdecVersion, - OffsetTokenVerificationFunction offsetTokenVerificationFunction) { + OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion) { this.isClosed = false; this.owningClient = client; @@ -141,7 +144,16 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this.memoryInfoProvider = MemoryInfoProviderFromRuntime.getInstance(); this.channelFlushContext = new ChannelFlushContext( - name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId); + name, + dbName, + schemaName, + tableName, + channelSequencer, + encryptionKey, + encryptionKeyId, + parquetWriterVersion == null + ? ParquetProperties.DEFAULT_WRITER_VERSION + : parquetWriterVersion); this.channelState = new ChannelRuntimeState(endOffsetToken, rowSequencer, true); this.rowBuffer = AbstractRowBuffer.createRowBuffer( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index fd1c0e38a..c57745c02 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -386,6 +386,8 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setOnErrorOption(request.getOnErrorOption()) .setDefaultTimezone(request.getDefaultTimezone()) .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) + .setParquetWriterVersion( + response.getIcebergSerializationPolicy().getParquetWriterVersion()) .build(); // Setup the row buffer schema diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index aeb3682d2..148339ea4 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.utils; import java.util.Arrays; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** Contains all the constants needed for Streaming Ingest */ @@ -71,11 +72,47 @@ public class Constants { public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; - public static final int PARQUET_MAJOR_VERSION = 1; public static final int PARQUET_MINOR_VERSION = 0; - // Length of the magic bytes "PAR1" or "PARE" in Parquet file - public static final int PARQUET_MAGIC_BYTES_LENGTH = 4; + /** + * Iceberg table serialization policy. Use v2 parquet writer for optimized serialization, + * otherwise v1. + */ + public enum IcebergSerializationPolicy { + NON_ICEBERG, + COMPATIBLE, + OPTIMIZED; + + public static IcebergSerializationPolicy fromName(String name) { + if (name == null) { + return NON_ICEBERG; + } + for (IcebergSerializationPolicy e : IcebergSerializationPolicy.values()) { + if (e.name().equalsIgnoreCase(name)) { + return e; + } + } + throw new IllegalArgumentException( + String.format( + "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", + name, Arrays.asList(IcebergSerializationPolicy.values()))); + } + + public ParquetProperties.WriterVersion getParquetWriterVersion() { + switch (this) { + case NON_ICEBERG: + case COMPATIBLE: + return ParquetProperties.WriterVersion.PARQUET_1_0; + case OPTIMIZED: + return ParquetProperties.WriterVersion.PARQUET_2_0; + default: + throw new IllegalArgumentException( + String.format( + "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", + this.name(), Arrays.asList(IcebergSerializationPolicy.values()))); + } + } + } public enum WriteMode { CLOUD_STORAGE, diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 0ff0bc30d..3d04aa1b2 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -10,7 +10,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.parquet.column.ParquetProperties; /** Utility class to provide configurable constants */ public class ParameterProvider { @@ -48,10 +47,6 @@ public class ParameterProvider { public static final String ENABLE_NEW_JSON_PARSING_LOGIC = "ENABLE_NEW_JSON_PARSING_LOGIC".toLowerCase(); - public static final String PARQUET_WRITER_VERSION = "PARQUET_WRITER_VERSION".toLowerCase(); - public static final String ENABLE_PARQUET_DICTIONARY_ENCODING = - "ENABLE_PARQUET_DICTIONARY_ENCODING".toLowerCase(); - // Default values public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000; @@ -79,11 +74,6 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; - public static final ParquetProperties.WriterVersion PARQUET_WRITER_VERSION_DEFAULT = - ParquetProperties.WriterVersion.PARQUET_1_0; - - public static final boolean ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT = false; - /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; @@ -263,16 +253,6 @@ private void setParameterMap( props, false); - this.checkAndUpdate( - PARQUET_WRITER_VERSION, PARQUET_WRITER_VERSION_DEFAULT, parameterOverrides, props, false); - - this.checkAndUpdate( - ENABLE_PARQUET_DICTIONARY_ENCODING, - ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT, - parameterOverrides, - props, - false); - if (getMaxChunksInBlob() > getMaxChunksInRegistrationRequest()) { throw new IllegalArgumentException( String.format( @@ -507,24 +487,6 @@ public boolean isEnableNewJsonParsingLogic() { return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } - /** @return Parquet writer version */ - public ParquetProperties.WriterVersion getParquetWriterVersion() { - Object val = - this.parameterMap.getOrDefault(PARQUET_WRITER_VERSION, PARQUET_WRITER_VERSION_DEFAULT); - if (val instanceof ParquetProperties.WriterVersion) { - return (ParquetProperties.WriterVersion) val; - } - return ParquetProperties.WriterVersion.fromString((String) val); - } - - /** @return Whether to enable dictionary encoding in parquet */ - public boolean isEnableParquetDictionaryEncoding() { - Object val = - this.parameterMap.getOrDefault( - ENABLE_PARQUET_DICTIONARY_ENCODING, ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT); - return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; - } - @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index 974f6fa8c..becbfb46a 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -8,6 +8,7 @@ import com.codahale.metrics.Timer; import io.netty.util.internal.PlatformDependent; +import java.io.IOException; import java.io.StringReader; import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; @@ -29,6 +30,8 @@ import java.util.Properties; import net.snowflake.client.core.SFSessionProperty; import org.apache.commons.codec.binary.Base64; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; import org.bouncycastle.openssl.PEMParser; import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; @@ -432,16 +435,22 @@ public static String concatDotPath(String... path) { } /** - * Get the little endian int from a byte array with offset + * Get the extended metadata size (footer size) from a parquet file * - * @param bytes the byte array - * @param offset the offset - * @return the little endian int + * @param bytes the serialized parquet file + * @param length the length of the byte array without padding + * @return the extended metadata size */ - public static int getLittleEndianInt(byte[] bytes, int offset) { - return (bytes[offset] & 0xFF) - | ((bytes[offset + 1] & 0xFF) << 8) - | ((bytes[offset + 2] & 0xFF) << 16) - | ((bytes[offset + 3] & 0xFF) << 24); + public static int getExtendedMetadataSize(byte[] bytes, int length) throws IOException { + final int magicOffset = length - ParquetFileWriter.MAGIC.length; + final int footerSizeOffset = magicOffset - Integer.BYTES; + if (bytes.length < length + || footerSizeOffset < 0 + || !ParquetFileWriter.MAGIC_STR.equals( + new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) { + throw new IllegalArgumentException("Invalid parquet file"); + } + + return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset); } } diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index 2bc69c4b4..c3126847d 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -43,11 +43,12 @@ public class BdecParquetWriter implements AutoCloseable { // Optional cap on the max number of row groups to allow per file, if this is exceeded we'll end // up throwing private final Optional maxRowGroups; - private long rowsWritten = 0; private final ParquetProperties.WriterVersion writerVersion; private final boolean enableDictionaryEncoding; + private long rowsWritten = 0; + /** * Creates a BDEC specific parquet writer. * diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 09f14056f..1fe6a53ad 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -75,7 +75,6 @@ private List> createChannelDataPerTable(int metada 100L, isIceberg ? Optional.of(1) : Optional.empty(), Constants.BdecParquetCompression.GZIP, - ParquetProperties.WriterVersion.PARQUET_1_0, false)) .when(channelData) .createFlusher(); @@ -106,7 +105,15 @@ private List> createChannelDataPerTable(int metada .putIfAbsent( columnName, new RowBufferStats(columnName, null, 1, isIceberg ? 0 : null, isIceberg)); channelData.setChannelContext( - new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L)); + new ChannelFlushContext( + "channel1", + "DB", + "SCHEMA", + "TABLE", + 1L, + "enc", + 1L, + ParquetProperties.DEFAULT_WRITER_VERSION)); return Collections.singletonList(channelData); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java index 5b545d697..59b3a231a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java @@ -8,13 +8,22 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class ChannelDataTest { + @Parameterized.Parameters(name = "enableDistinctValues: {0}") + public static Object[] enableDistinctValues() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean enableDistinctValues; @Test public void testGetCombinedColumnStatsMapNulls() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableDistinctValues); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); @@ -43,12 +52,12 @@ public void testGetCombinedColumnStatsMapNulls() { @Test public void testGetCombinedColumnStatsMapMissingColumn() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableDistinctValues); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); Map right = new HashMap<>(); - RowBufferStats rightStats1 = new RowBufferStats("COL1"); + RowBufferStats rightStats1 = new RowBufferStats("COL1", enableDistinctValues); right.put("foo", rightStats1); rightStats1.addIntValue(new BigInteger("11")); @@ -78,10 +87,10 @@ public void testGetCombinedColumnStatsMap() { Map left = new HashMap<>(); Map right = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); - RowBufferStats rightStats1 = new RowBufferStats("COL1"); - RowBufferStats leftStats2 = new RowBufferStats("COL1"); - RowBufferStats rightStats2 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats rightStats1 = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats leftStats2 = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats rightStats2 = new RowBufferStats("COL1", enableDistinctValues); left.put("one", leftStats1); left.put("two", leftStats2); @@ -107,7 +116,7 @@ public void testGetCombinedColumnStatsMap() { Assert.assertEquals(new BigInteger("10"), oneCombined.getCurrentMinIntValue()); Assert.assertEquals(new BigInteger("17"), oneCombined.getCurrentMaxIntValue()); - Assert.assertEquals(-1, oneCombined.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 5 : -1, oneCombined.getDistinctValues()); Assert.assertNull(oneCombined.getCurrentMinStrValue()); Assert.assertNull(oneCombined.getCurrentMaxStrValue()); Assert.assertNull(oneCombined.getCurrentMinRealValue()); @@ -117,7 +126,7 @@ public void testGetCombinedColumnStatsMap() { "10".getBytes(StandardCharsets.UTF_8), twoCombined.getCurrentMinStrValue()); Assert.assertArrayEquals( "17".getBytes(StandardCharsets.UTF_8), twoCombined.getCurrentMaxStrValue()); - Assert.assertEquals(-1, twoCombined.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 5 : -1, twoCombined.getDistinctValues()); Assert.assertNull(twoCombined.getCurrentMinIntValue()); Assert.assertNull(twoCombined.getCurrentMaxIntValue()); Assert.assertNull(twoCombined.getCurrentMinRealValue()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index cd7354f09..c3af26782 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -301,6 +301,7 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( onErrorOption, defaultTimezone, Constants.BdecVersion.THREE, + null, null); } @@ -877,8 +878,8 @@ public void testBuildAndUpload() throws Exception { Map eps1 = new HashMap<>(); Map eps2 = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats("COL1"); - RowBufferStats stats2 = new RowBufferStats("COL1"); + RowBufferStats stats1 = new RowBufferStats("COL1", isIcebergMode); + RowBufferStats stats2 = new RowBufferStats("COL1", isIcebergMode); eps1.put("one", stats1); eps2.put("one", stats2); @@ -1115,7 +1116,7 @@ public void testBlobBuilder() throws Exception { Map eps1 = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats("COL1"); + RowBufferStats stats1 = new RowBufferStats("COL1", isIcebergMode); eps1.put("one", stats1); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index 007dc3e23..e617d83d4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -38,7 +38,7 @@ public void parseValueBoolean() { Type type = Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).named("BOOLEAN_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -61,7 +61,7 @@ public void parseValueBoolean() { public void parseValueInt() { Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).named("INT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("INT_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("INT_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -88,7 +88,7 @@ public void parseValueDecimalToInt() { .as(LogicalTypeAnnotation.decimalType(4, 9)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -115,7 +115,7 @@ public void parseValueDateToInt() { .as(LogicalTypeAnnotation.dateType()) .named("DATE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -139,7 +139,7 @@ public void parseValueDateToInt() { public void parseValueLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).named("LONG_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -166,7 +166,7 @@ public void parseValueDecimalToLong() { .as(LogicalTypeAnnotation.decimalType(9, 18)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -193,7 +193,7 @@ public void parseValueTimeToLong() { .as(LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIME_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -220,7 +220,7 @@ public void parseValueTimestampToLong() { .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -247,7 +247,7 @@ public void parseValueTimestampTZToLong() { .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_TZ_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -271,7 +271,7 @@ public void parseValueTimestampTZToLong() { public void parseValueFloat() { Type type = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).named("FLOAT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -295,7 +295,7 @@ public void parseValueFloat() { public void parseValueDouble() { Type type = Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).named("DOUBLE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -319,7 +319,7 @@ public void parseValueDouble() { public void parseValueBinary() { Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -347,7 +347,7 @@ public void parseValueStringToBinary() { .as(LogicalTypeAnnotation.stringType()) .named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -377,7 +377,7 @@ public void parseValueFixed() { .length(4) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -406,7 +406,7 @@ public void parseValueDecimalToFixed() { .as(LogicalTypeAnnotation.decimalType(10, 20)) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -433,7 +433,7 @@ public void parseList() throws JsonProcessingException { Types.optionalList() .element(Types.optional(PrimitiveTypeName.INT32).named("element")) .named("LIST_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element"); + RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element", true); Map rowBufferStatsMap = new HashMap() { { @@ -500,8 +500,8 @@ public void parseMap() throws JsonProcessingException { .key(Types.required(PrimitiveTypeName.INT32).named("key")) .value(Types.optional(PrimitiveTypeName.INT32).named("value")) .named("MAP_COL"); - RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key"); - RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value"); + RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key", true); + RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value", true); Map rowBufferStatsMap = new HashMap() { { @@ -592,8 +592,8 @@ public void parseStruct() throws JsonProcessingException { .named("b")) .named("STRUCT_COL"); - RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a"); - RowBufferStats rowBufferBStats = new RowBufferStats("STRUCT_COL.b"); + RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a", true); + RowBufferStats rowBufferBStats = new RowBufferStats("STRUCT_COL.b", true); Map rowBufferStatsMap = new HashMap() { { @@ -703,7 +703,7 @@ public void parseNestedTypes() { private static Type generateNestedTypeAndStats( int depth, String name, Map rowBufferStatsMap, String path) { if (depth == 0) { - rowBufferStatsMap.put(path, new RowBufferStats(path)); + rowBufferStatsMap.put(path, new RowBufferStats(path, true)); return Types.optional(PrimitiveTypeName.INT32).named(name); } switch (depth % 3) { @@ -718,7 +718,8 @@ private static Type generateNestedTypeAndStats( .addField(generateNestedTypeAndStats(depth - 1, "a", rowBufferStatsMap, path + ".a")) .named(name); case 0: - rowBufferStatsMap.put(path + ".key_value.key", new RowBufferStats(path + ".key_value.key")); + rowBufferStatsMap.put( + path + ".key_value.key", new RowBufferStats(path + ".key_value.key", true)); return Types.optionalMap() .key(Types.required(PrimitiveTypeName.INT32).named("key")) .value( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java index 9f2e848f3..059829f14 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java @@ -4,12 +4,22 @@ import java.nio.charset.StandardCharsets; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RowBufferStatsTest { + @Parameterized.Parameters(name = "enableDistinctValues: {0}") + public static Object[] enableDistinctValues() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean enableDistinctValues; + @Test public void testEmptyState() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); Assert.assertNull(stats.getCollationDefinitionString()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -20,30 +30,30 @@ public void testEmptyState() throws Exception { Assert.assertNull(stats.getCurrentMaxIntValue()); Assert.assertEquals(0, stats.getCurrentNullCount()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 0 : -1, stats.getDistinctValues()); } @Test public void testMinMaxStrNonCol() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); stats.addStrValue("bob"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 1 : -1, stats.getDistinctValues()); stats.addStrValue("charlie"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 2 : -1, stats.getDistinctValues()); stats.addStrValue("alice"); Assert.assertArrayEquals( "alice".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 3 : -1, stats.getDistinctValues()); Assert.assertNull(stats.getCurrentMinRealValue()); Assert.assertNull(stats.getCurrentMaxRealValue()); @@ -55,22 +65,22 @@ public void testMinMaxStrNonCol() throws Exception { @Test public void testMinMaxInt() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); stats.addIntValue(BigInteger.valueOf(5)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 1 : -1, stats.getDistinctValues()); stats.addIntValue(BigInteger.valueOf(6)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 2 : -1, stats.getDistinctValues()); stats.addIntValue(BigInteger.valueOf(4)); Assert.assertEquals(BigInteger.valueOf((4)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 3 : -1, stats.getDistinctValues()); Assert.assertNull(stats.getCurrentMinRealValue()); Assert.assertNull(stats.getCurrentMaxRealValue()); @@ -82,22 +92,22 @@ public void testMinMaxInt() throws Exception { @Test public void testMinMaxReal() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); stats.addRealValue(1.0); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 1 : -1, stats.getDistinctValues()); stats.addRealValue(1.5); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 2 : -1, stats.getDistinctValues()); stats.addRealValue(.8); Assert.assertEquals(Double.valueOf(.8), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 3 : -1, stats.getDistinctValues()); Assert.assertNull(stats.getCurrentMinIntValue()); Assert.assertNull(stats.getCurrentMaxIntValue()); @@ -109,7 +119,7 @@ public void testMinMaxReal() throws Exception { @Test public void testIncCurrentNullCount() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); Assert.assertEquals(0, stats.getCurrentNullCount()); stats.incCurrentNullCount(); @@ -120,7 +130,7 @@ public void testIncCurrentNullCount() throws Exception { @Test public void testMaxLength() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); Assert.assertEquals(0, stats.getCurrentMaxLength()); stats.setCurrentMaxLength(100L); @@ -132,8 +142,8 @@ public void testMaxLength() throws Exception { @Test public void testGetCombinedStats() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1"); - RowBufferStats two = new RowBufferStats("COL1"); + RowBufferStats one = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats two = new RowBufferStats("COL1", enableDistinctValues); one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -150,7 +160,7 @@ public void testGetCombinedStats() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(1), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 7 : -1, result.getDistinctValues()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -159,8 +169,8 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableDistinctValues); + two = new RowBufferStats("COL1", enableDistinctValues); one.addRealValue(2d); one.addRealValue(4d); @@ -175,7 +185,7 @@ public void testGetCombinedStats() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(1), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 7 : -1, result.getDistinctValues()); Assert.assertEquals(0, result.getCurrentNullCount()); Assert.assertNull(result.getCollationDefinitionString()); @@ -185,8 +195,8 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings without collation - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableDistinctValues); + two = new RowBufferStats("COL1", enableDistinctValues); one.addStrValue("alpha"); one.addStrValue("d"); @@ -205,7 +215,7 @@ public void testGetCombinedStats() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertArrayEquals("a".getBytes(StandardCharsets.UTF_8), result.getCurrentMinStrValue()); Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 7 : -1, result.getDistinctValues()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertEquals(5, result.getCurrentMaxLength()); @@ -218,8 +228,8 @@ public void testGetCombinedStats() throws Exception { @Test public void testGetCombinedStatsNull() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1"); - RowBufferStats two = new RowBufferStats("COL1"); + RowBufferStats one = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats two = new RowBufferStats("COL1", enableDistinctValues); one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -231,7 +241,7 @@ public void testGetCombinedStatsNull() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(2), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 4 : -1, result.getDistinctValues()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -240,7 +250,7 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableDistinctValues); one.addRealValue(2d); one.addRealValue(4d); @@ -250,7 +260,7 @@ public void testGetCombinedStatsNull() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(2), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 4 : -1, result.getDistinctValues()); Assert.assertEquals(0, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -259,8 +269,8 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableDistinctValues); + two = new RowBufferStats("COL1", enableDistinctValues); one.addStrValue("alpha"); one.addStrValue("d"); @@ -272,7 +282,7 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertArrayEquals( "alpha".getBytes(StandardCharsets.UTF_8), result.getCurrentMinStrValue()); Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 4 : -1, result.getDistinctValues()); Assert.assertEquals(1, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinRealValue()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 8b5ad153d..09451c2cd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -26,6 +26,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.StringUtils; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetReader; import org.junit.Assert; import org.junit.Before; @@ -149,8 +150,6 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o Constants.BdecParquetCompression.GZIP, ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, isIcebergMode ? Optional.of(1) : Optional.empty(), - PARQUET_WRITER_VERSION_DEFAULT, - ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT, isIcebergMode), null, null); @@ -563,12 +562,12 @@ private void testDoubleQuotesColumnNameHelper(OpenChannelRequest.OnErrorOption o public void testBuildEpInfoFromStats() { Map colStats = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats("intColumn"); + RowBufferStats stats1 = new RowBufferStats("intColumn", isIcebergMode); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); stats1.addIntValue(BigInteger.valueOf(1)); - RowBufferStats stats2 = new RowBufferStats("strColumn"); + RowBufferStats stats2 = new RowBufferStats("strColumn", isIcebergMode); stats2.addStrValue("alice"); stats2.addStrValue("bob"); stats2.incCurrentNullCount(); @@ -581,7 +580,7 @@ public void testBuildEpInfoFromStats() { Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties strColumnResult = columnResults.get("strColumn"); - Assert.assertEquals(-1, strColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 2 : -1, strColumnResult.getDistinctValues()); Assert.assertEquals( Hex.encodeHexString("alice".getBytes(StandardCharsets.UTF_8)), strColumnResult.getMinStrValue()); @@ -591,7 +590,7 @@ public void testBuildEpInfoFromStats() { Assert.assertEquals(1, strColumnResult.getNullCount()); FileColumnProperties intColumnResult = columnResults.get("intColumn"); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 3 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals(BigInteger.valueOf(1), intColumnResult.getMinIntValue()); Assert.assertEquals(BigInteger.valueOf(10), intColumnResult.getMaxIntValue()); Assert.assertEquals(0, intColumnResult.getNullCount()); @@ -603,8 +602,8 @@ public void testBuildEpInfoFromNullColumnStats() { final String realColName = "realCol"; Map colStats = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats(intColName); - RowBufferStats stats2 = new RowBufferStats(realColName); + RowBufferStats stats1 = new RowBufferStats(intColName, isIcebergMode); + RowBufferStats stats2 = new RowBufferStats(realColName, isIcebergMode); stats1.incCurrentNullCount(); stats2.incCurrentNullCount(); @@ -616,7 +615,7 @@ public void testBuildEpInfoFromNullColumnStats() { Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties intColumnResult = columnResults.get(intColName); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 0 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals( isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue()); @@ -627,7 +626,7 @@ public void testBuildEpInfoFromNullColumnStats() { Assert.assertEquals(0, intColumnResult.getMaxLength()); FileColumnProperties realColumnResult = columnResults.get(realColName); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 0 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals( isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue()); @@ -642,12 +641,12 @@ public void testBuildEpInfoFromNullColumnStats() { public void testInvalidEPInfo() { Map colStats = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats("intColumn"); + RowBufferStats stats1 = new RowBufferStats("intColumn", isIcebergMode); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); stats1.addIntValue(BigInteger.valueOf(1)); - RowBufferStats stats2 = new RowBufferStats("strColumn"); + RowBufferStats stats2 = new RowBufferStats("strColumn", isIcebergMode); stats2.addStrValue("alice"); stats2.incCurrentNullCount(); stats2.incCurrentNullCount(); @@ -769,33 +768,36 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( BigInteger.valueOf(10), columnEpStats.get("colTinyInt").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("colTinyInt").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("colTinyInt").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 2 : -1, columnEpStats.get("colTinyInt").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(1), columnEpStats.get("COLTINYINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(1), columnEpStats.get("COLTINYINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLTINYINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLTINYINT").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 1 : -1, columnEpStats.get("COLTINYINT").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(3), columnEpStats.get("COLSMALLINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(2), columnEpStats.get("COLSMALLINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLSMALLINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLSMALLINT").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 2 : -1, columnEpStats.get("COLSMALLINT").getDistinctValues()); Assert.assertEquals(BigInteger.valueOf(3), columnEpStats.get("COLINT").getCurrentMaxIntValue()); Assert.assertEquals(BigInteger.valueOf(3), columnEpStats.get("COLINT").getCurrentMinIntValue()); Assert.assertEquals(1L, columnEpStats.get("COLINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLINT").getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 1 : -1, columnEpStats.get("COLINT").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(40), columnEpStats.get("COLBIGINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(4), columnEpStats.get("COLBIGINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLBIGINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLBIGINT").getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 2 : -1, columnEpStats.get("COLBIGINT").getDistinctValues()); Assert.assertArrayEquals( "2".getBytes(StandardCharsets.UTF_8), columnEpStats.get("COLCHAR").getCurrentMinStrValue()); @@ -803,7 +805,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { "alice".getBytes(StandardCharsets.UTF_8), columnEpStats.get("COLCHAR").getCurrentMaxStrValue()); Assert.assertEquals(0, columnEpStats.get("COLCHAR").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLCHAR").getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 2 : -1, columnEpStats.get("COLCHAR").getDistinctValues()); // Confirm we reset ChannelData resetResults = rowBuffer.flush(); @@ -1908,7 +1910,16 @@ public void testParquetFileNameMetadata() throws IOException { bufferUnderTest.setupSchema(Collections.singletonList(colChar)); loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); ChannelData data = bufferUnderTest.flush(); - data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L)); + data.setChannelContext( + new ChannelFlushContext( + "name", + "db", + "schema", + "table", + 1L, + "key", + 0L, + ParquetProperties.DEFAULT_WRITER_VERSION)); ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); Flusher.SerializationResult result = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java index 64db57c31..1397acfd7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java @@ -29,7 +29,7 @@ public void parseValueFixedSB1ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 12, @@ -61,7 +61,7 @@ public void parseValueFixedSB2ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 1234, @@ -93,7 +93,7 @@ public void parseValueFixedSB4ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789, @@ -125,7 +125,7 @@ public void parseValueFixedSB8ToInt64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789987654321L, @@ -157,7 +157,7 @@ public void parseValueFixedSB16ToByteArray() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("91234567899876543219876543211234567891"), @@ -191,7 +191,7 @@ public void parseValueFixedDecimalToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("12345.54321"), @@ -221,7 +221,7 @@ public void parseValueDouble() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 12345.54321d, @@ -251,7 +251,7 @@ public void parseValueBoolean() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( true, @@ -281,7 +281,7 @@ public void parseValueBinary() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "1234abcd".getBytes(), @@ -326,7 +326,7 @@ private void testJsonWithLogicalType(String logicalType, boolean enableNewJsonPa String var = "{\"key1\":-879869596,\"key2\":\"value2\",\"key3\":null," + "\"key4\":{\"key41\":0.032437,\"key42\":\"value42\",\"key43\":null}}"; - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( var, @@ -376,7 +376,7 @@ private void testNullJsonWithLogicalType(String var, boolean enableNewJsonParsin .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( var, @@ -417,7 +417,7 @@ public void parseValueArrayToBinaryInternal(boolean enableNewJsonParsingLogic) { input.put("b", "2"); input.put("c", "3"); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( input, @@ -455,7 +455,7 @@ public void parseValueTextToBinary() { String text = "This is a sample text! Length is bigger than 32 bytes :)"; - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( text, @@ -492,7 +492,7 @@ public void parseValueTimestampNtzSB4Error() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); SFException exception = Assert.assertThrows( SFException.class, @@ -520,7 +520,7 @@ public void parseValueTimestampNtzSB8ToINT64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2013-04-28T20:57:01.000", @@ -551,7 +551,7 @@ public void parseValueTimestampNtzSB16ToByteArray() { .scale(9) // nanos .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2022-09-18T22:05:07.123456789", @@ -583,7 +583,7 @@ public void parseValueDateToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2021-01-01", @@ -614,7 +614,7 @@ public void parseValueTimeSB4ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00", @@ -645,7 +645,7 @@ public void parseValueTimeSB8ToInt64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00.123", @@ -676,7 +676,7 @@ public void parseValueTimeSB16Error() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); SFException exception = Assert.assertThrows( SFException.class, diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 0dbeeebee..cc42ac2c9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -130,6 +130,7 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); channel2 = new SnowflakeStreamingIngestChannelInternal<>( @@ -146,6 +147,7 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); channel3 = new SnowflakeStreamingIngestChannelInternal<>( @@ -162,6 +164,7 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); channel4 = new SnowflakeStreamingIngestChannelInternal<>( @@ -178,6 +181,7 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); } @@ -375,6 +379,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = @@ -437,6 +442,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); try { @@ -490,6 +496,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); ChannelMetadata channelMetadata = @@ -500,8 +507,8 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { .build(); Map columnEps = new HashMap<>(); - columnEps.put("column", new RowBufferStats("COL1")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode); + columnEps.put("column", new RowBufferStats("COL1", isIcebergMode)); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode); ChunkMetadata chunkMetadata = ChunkMetadata.builder() @@ -549,8 +556,8 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { private Pair, Set> getRetryBlobMetadata() { Map columnEps = new HashMap<>(); - columnEps.put("column", new RowBufferStats("COL1")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode); + columnEps.put("column", new RowBufferStats("COL1", isIcebergMode)); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode); ChannelMetadata channelMetadata1 = ChannelMetadata.builder() @@ -1250,6 +1257,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); ChannelsStatusRequest.ChannelStatusRequestDTO dto =