diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 0fa0f39c4..07336e4e8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -641,21 +641,23 @@ public synchronized void close(String name) { * * @param rowCount: count of rows in the given buffer * @param colStats: map of column name to RowBufferStats - * @param setDefaultValues: whether to set default values for null fields the EPs + * @param setDefaultValues: whether to set default values for null min/max field in the EPs + * @param enableDistinctValuesCount: whether to include valid NDV in the EPs * @return the EPs built from column stats */ static EpInfo buildEpInfoFromStats( - long rowCount, Map colStats, boolean setDefaultValues) { - EpInfo epInfo = new EpInfo(rowCount, new HashMap<>()); - boolean enableDistinctValues = false; + long rowCount, + Map colStats, + boolean setDefaultValues, + boolean enableDistinctValuesCount) { + EpInfo epInfo = new EpInfo(rowCount, new HashMap<>(), enableDistinctValuesCount); for (Map.Entry colStat : colStats.entrySet()) { RowBufferStats stat = colStat.getValue(); - enableDistinctValues = stat.isEnableDistinctValue(); FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues); String colName = colStat.getValue().getColumnDisplayName(); epInfo.getColumnEps().put(colName, dto); } - epInfo.verifyEpInfo(enableDistinctValues); + epInfo.verifyEpInfo(); return epInfo; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 8a6599407..6c12170d3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -31,8 +31,10 @@ import net.snowflake.ingest.utils.Cryptor; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; +import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.Utils; import org.apache.commons.codec.binary.Hex; +import org.apache.parquet.hadoop.ParquetFileWriter; /** * Build a single blob file that contains file header plus data. The header will be a @@ -68,6 +70,7 @@ static Blob constructBlobAndMetadata( String filePath, List>> blobData, Constants.BdecVersion bdecVersion, + ParameterProvider parameterProvider, InternalParameterProvider internalParameterProvider) throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, @@ -98,13 +101,6 @@ static Blob constructBlobAndMetadata( byte[] paddedChunkData = paddedChunk.getFirst(); chunkLength = paddedChunk.getSecond(); - if (internalParameterProvider.getComputeExtendedMetadataSize()) { - extendedMetadataSize = - Utils.getLittleEndianInt( - paddedChunkData, - chunkLength - Constants.PARQUET_MAGIC_BYTES_LENGTH - Integer.BYTES); - } - // Encrypt the compressed chunk data, the encryption key is derived using the key from // server with the full blob path. // We need to maintain IV as a block counter for the whole file, even interleaved, @@ -120,11 +116,8 @@ static Blob constructBlobAndMetadata( chunkLength = compressedChunkData.length; compressedChunkDataSize = chunkLength; - if (internalParameterProvider.getComputeExtendedMetadataSize()) { - extendedMetadataSize = - Utils.getLittleEndianInt( - compressedChunkData, - chunkLength - Constants.PARQUET_MAGIC_BYTES_LENGTH - Integer.BYTES); + if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { + extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength); } } @@ -150,13 +143,14 @@ static Blob constructBlobAndMetadata( AbstractRowBuffer.buildEpInfoFromStats( serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined, - internalParameterProvider.setDefaultValuesInEp())) + internalParameterProvider.setDefaultValuesInEp(), + parameterProvider.isEnableDistinctValuesCount())) .setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst()) .setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond()); if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { chunkMetadataBuilder - .setMajorVersion(Constants.PARQUET_MAJOR_VERSION) + .setMajorVersion(ParquetFileWriter.CURRENT_VERSION) .setMinorVersion(Constants.PARQUET_MINOR_VERSION) // set createdOn in seconds .setCreatedOn(System.currentTimeMillis() / 1000) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java index fe9542267..1de924cc5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * Channel immutable identification and encryption attributes. @@ -29,6 +30,8 @@ class ChannelFlushContext { // Data encryption key id private final Long encryptionKeyId; + private final ParquetProperties.WriterVersion parquetWriterVersion; + ChannelFlushContext( String name, String dbName, @@ -36,7 +39,8 @@ class ChannelFlushContext { String tableName, Long channelSequencer, String encryptionKey, - Long encryptionKeyId) { + Long encryptionKeyId, + ParquetProperties.WriterVersion parquetWriterVersion) { this.name = name; this.fullyQualifiedName = Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name); @@ -47,6 +51,7 @@ class ChannelFlushContext { this.channelSequencer = channelSequencer; this.encryptionKey = encryptionKey; this.encryptionKeyId = encryptionKeyId; + this.parquetWriterVersion = parquetWriterVersion; } @Override @@ -115,4 +120,8 @@ String getEncryptionKey() { Long getEncryptionKeyId() { return encryptionKeyId; } + + ParquetProperties.WriterVersion getParquetWriterVersion() { + return parquetWriterVersion; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 110e6383e..a0aa1df35 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -7,7 +7,6 @@ import java.util.Optional; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; -import org.apache.parquet.column.ParquetProperties; /** Channel's buffer relevant parameters that are set at the owning client level. */ public class ClientBufferParameters { @@ -24,12 +23,10 @@ public class ClientBufferParameters { private final Optional maxRowGroups; - private ParquetProperties.WriterVersion parquetWriterVersion; - - private boolean enableDictionaryEncoding; - private boolean isIcebergMode; + private boolean enableDistinctValuesCount; + /** * Private constructor used for test methods * @@ -43,17 +40,15 @@ private ClientBufferParameters( Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic, Optional maxRowGroups, - ParquetProperties.WriterVersion parquetWriterVersion, - boolean enableDictionaryEncoding, - boolean isIcebergMode) { + boolean isIcebergMode, + boolean enableDistinctValuesCount) { this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; this.maxRowGroups = maxRowGroups; - this.parquetWriterVersion = parquetWriterVersion; - this.enableDictionaryEncoding = enableDictionaryEncoding; this.isIcebergMode = isIcebergMode; + this.enableDistinctValuesCount = enableDistinctValuesCount; } /** @param clientInternal reference to the client object where the relevant parameters are set */ @@ -74,14 +69,6 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic() : ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; - this.parquetWriterVersion = - clientInternal != null - ? clientInternal.getParameterProvider().getParquetWriterVersion() - : ParameterProvider.PARQUET_WRITER_VERSION_DEFAULT; - this.enableDictionaryEncoding = - clientInternal != null - ? clientInternal.getParameterProvider().isEnableParquetDictionaryEncoding() - : ParameterProvider.ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT; this.maxRowGroups = isIcebergMode ? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT) @@ -90,6 +77,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.isIcebergMode() : ParameterProvider.IS_ICEBERG_MODE_DEFAULT; + this.enableDistinctValuesCount = + clientInternal != null + ? clientInternal.getParameterProvider().isEnableDistinctValuesCount() + : ParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT; } /** @@ -104,18 +95,16 @@ public static ClientBufferParameters test_createClientBufferParameters( Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic, Optional maxRowGroups, - ParquetProperties.WriterVersion parquetWriterVersion, - boolean enableDictionaryEncoding, - boolean isIcebergMode) { + boolean isIcebergMode, + boolean enableDistinctValuesCount) { return new ClientBufferParameters( maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, enableNewJsonParsingLogic, maxRowGroups, - parquetWriterVersion, - enableDictionaryEncoding, - isIcebergMode); + isIcebergMode, + enableDistinctValuesCount); } public long getMaxChunkSizeInBytes() { @@ -146,11 +135,7 @@ public String getParquetMessageTypeName() { return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME; } - public ParquetProperties.WriterVersion getParquetWriterVersion() { - return parquetWriterVersion; - } - - public boolean isEnableDictionaryEncoding() { - return enableDictionaryEncoding; + public boolean isEnableDistinctValuesCount() { + return enableDistinctValuesCount; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java b/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java index 3a0513e4c..28203cfd5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java @@ -13,16 +13,22 @@ class EpInfo { private Map columnEps; + private boolean enableDistinctValuesCount; + /** Default constructor, needed for Jackson */ EpInfo() {} - EpInfo(long rowCount, Map columnEps) { + EpInfo( + long rowCount, + Map columnEps, + boolean enableDistinctValuesCount) { this.rowCount = rowCount; this.columnEps = columnEps; + this.enableDistinctValuesCount = enableDistinctValuesCount; } /** Some basic verification logic to make sure the EP info is correct */ - public void verifyEpInfo(boolean enableDistinctValues) { + public void verifyEpInfo() { for (Map.Entry entry : columnEps.entrySet()) { String colName = entry.getKey(); FileColumnProperties colEp = entry.getValue(); @@ -35,8 +41,8 @@ public void verifyEpInfo(boolean enableDistinctValues) { colName, colEp.getNullCount(), rowCount)); } - // Make sure the NDV should always be -1 when the NDV is not enabled - if (!enableDistinctValues && colEp.getDistinctValues() != EP_NDV_UNKNOWN) { + // Make sure the NDV should always be -1 when the NDV set to default + if (!enableDistinctValuesCount && colEp.getDistinctValues() != EP_NDV_UNKNOWN) { throw new SFException( ErrorCode.INTERNAL_ERROR, String.format( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 2e64f77b8..ca0a16ef9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -597,11 +597,14 @@ BlobMetadata buildAndUpload( InvalidKeyException { Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency); - InternalParameterProvider paramProvider = this.owningClient.getInternalParameterProvider(); // Construct the blob along with the metadata of the blob BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata( - blobPath.fileName, blobData, bdecVersion, paramProvider); + blobPath.fileName, + blobData, + bdecVersion, + this.owningClient.getParameterProvider(), + this.owningClient.getInternalParameterProvider()); blob.blobStats.setBuildDurationMs(buildContext); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java index bf6e07036..11a2858f6 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -31,9 +31,4 @@ boolean setIcebergSpecificFieldsInEp() { // in the EP metadata, createdOn, and extendedMetadataSize. return isIcebergMode; } - - boolean getComputeExtendedMetadataSize() { - // When in Iceberg mode, extendedMetadataSize is computed. Otherwise, it is -1. - return isIcebergMode; - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java index 92f7ea8c5..dce6f060f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelResponse.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import net.snowflake.ingest.utils.Constants; /** Response to the OpenChannelRequest */ class OpenChannelResponse extends StreamingIngestResponse { @@ -22,6 +23,7 @@ class OpenChannelResponse extends StreamingIngestResponse { private String encryptionKey; private Long encryptionKeyId; private FileLocationInfo icebergLocationInfo; + private String icebergSerializationPolicy; @JsonProperty("status_code") void setStatusCode(Long statusCode) { @@ -140,4 +142,13 @@ void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) { FileLocationInfo getIcebergLocationInfo() { return this.icebergLocationInfo; } + + @JsonProperty("iceberg_serialization_policy") + void setIcebergSerializationPolicy(String icebergSerializationPolicy) { + this.icebergSerializationPolicy = icebergSerializationPolicy; + } + + Constants.IcebergSerializationPolicy getIcebergSerializationPolicy() { + return Constants.IcebergSerializationPolicy.fromName(this.icebergSerializationPolicy); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 5b11996ec..3ee240226 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -31,7 +31,6 @@ public class ParquetFlusher implements Flusher { private final Optional maxRowGroups; private final Constants.BdecParquetCompression bdecParquetCompression; - private final ParquetProperties.WriterVersion parquetWriterVersion; private final boolean enableDictionaryEncoding; /** Construct parquet flusher from its schema. */ @@ -40,13 +39,11 @@ public ParquetFlusher( long maxChunkSizeInBytes, Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression, - ParquetProperties.WriterVersion parquetWriterVersion, boolean enableDictionaryEncoding) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; - this.parquetWriterVersion = parquetWriterVersion; this.enableDictionaryEncoding = enableDictionaryEncoding; } @@ -69,6 +66,7 @@ private SerializationResult serializeFromJavaObjects( BdecParquetWriter parquetWriter; ByteArrayOutputStream mergedData = new ByteArrayOutputStream(); Pair chunkMinMaxInsertTimeInMs = null; + ParquetProperties.WriterVersion parquetWriterVersion = null; for (ChannelData data : channelsDataPerTable) { // Create channel metadata @@ -110,6 +108,15 @@ private SerializationResult serializeFromJavaObjects( chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); } + // Check if all the channels have the same parquet writer version + if (parquetWriterVersion == null) { + parquetWriterVersion = data.getChannelContext().getParquetWriterVersion(); + } else if (!parquetWriterVersion.equals(data.getChannelContext().getParquetWriterVersion())) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Parquet writer version and storage serialization policy mismatch within a chunk"); + } + rows.addAll(data.getVectors().rows); rowCount += data.getRowCount(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index adb8eb9b0..0ee6b9696 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -106,7 +106,7 @@ public void setupSchema(List columns) { column.getCollation(), column.getOrdinal(), null /* fieldId */, - clientBufferParameters.getIsIcebergMode())); + clientBufferParameters.isEnableDistinctValuesCount())); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { @@ -121,7 +121,7 @@ public void setupSchema(List columns) { column.getCollation(), column.getOrdinal(), null /* fieldId */, - clientBufferParameters.getIsIcebergMode())); + clientBufferParameters.isEnableDistinctValuesCount())); } } @@ -195,7 +195,7 @@ public void setupSchema(List columns) { null /* collationDefinitionString */, ordinal, fieldId, - clientBufferParameters.getIsIcebergMode())); + clientBufferParameters.isEnableDistinctValuesCount())); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { @@ -206,7 +206,7 @@ public void setupSchema(List columns) { null /* collationDefinitionString */, ordinal, fieldId, - clientBufferParameters.getIsIcebergMode())); + clientBufferParameters.isEnableDistinctValuesCount())); } } } @@ -399,7 +399,6 @@ public Flusher createFlusher() { clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getMaxRowGroups(), clientBufferParameters.getBdecParquetCompression(), - clientBufferParameters.getParquetWriterVersion(), clientBufferParameters.getIsIcebergMode()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java index 7e54b8d59..add7ad1ad 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java @@ -8,7 +8,6 @@ import java.math.BigInteger; import java.nio.charset.StandardCharsets; -import java.util.Base64; import java.util.HashSet; import java.util.Objects; import java.util.Set; @@ -62,8 +61,8 @@ class RowBufferStats { reset(); } - RowBufferStats(String columnDisplayName) { - this(columnDisplayName, null, -1, null, false); + RowBufferStats(String columnDisplayName, boolean enableDistinctValues) { + this(columnDisplayName, null, -1, null, enableDistinctValues); } void reset() { @@ -160,11 +159,8 @@ void addStrValue(String value) { } void addBinaryValue(byte[] valueBytes) { + // TODO: SNOW-1721193: Add NDV support in EP for binary columns this.setCurrentMaxLength(valueBytes.length); - if (enableDistinctValue) { - distinctValues.add(new String(Base64.getEncoder().encode(valueBytes))); - } - // Check if new min/max string if (this.currentMinStrValue == null) { this.currentMinStrValue = valueBytes; @@ -258,7 +254,9 @@ long getCurrentMaxLength() { * @return -1 indicating the NDV is unknown */ long getDistinctValues() { - return enableDistinctValue ? distinctValues.size() : EP_NDV_UNKNOWN; + return enableDistinctValue && currentMinStrValue == null + ? distinctValues.size() + : EP_NDV_UNKNOWN; } String getCollationDefinitionString() { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java index a56b82ed5..9c39adc80 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFactory.java @@ -8,6 +8,7 @@ import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** Builds a Streaming Ingest channel for a specific Streaming Ingest client */ class SnowflakeStreamingIngestChannelFactory { @@ -30,6 +31,7 @@ static class SnowflakeStreamingIngestChannelBuilder { private OpenChannelRequest.OnErrorOption onErrorOption; private ZoneId defaultTimezone; private OffsetTokenVerificationFunction offsetTokenVerificationFunction; + private ParquetProperties.WriterVersion parquetWriterVersion; private SnowflakeStreamingIngestChannelBuilder(String name) { this.name = name; @@ -98,6 +100,12 @@ SnowflakeStreamingIngestChannelBuilder setOffsetTokenVerificationFunction( return this; } + SnowflakeStreamingIngestChannelBuilder setParquetWriterVersion( + ParquetProperties.WriterVersion parquetWriterVersion) { + this.parquetWriterVersion = parquetWriterVersion; + return this; + } + SnowflakeStreamingIngestChannelInternal build() { Utils.assertStringNotNullOrEmpty("channel name", this.name); Utils.assertStringNotNullOrEmpty("table name", this.tableName); @@ -124,7 +132,8 @@ SnowflakeStreamingIngestChannelInternal build() { this.onErrorOption, this.defaultTimezone, this.owningClient.getParameterProvider().getBlobFormatVersion(), - this.offsetTokenVerificationFunction); + this.offsetTokenVerificationFunction, + this.parquetWriterVersion); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 4e884387b..af12b31ea 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -30,6 +30,7 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.column.ParquetProperties; /** * The first version of implementation for SnowflakeStreamingIngestChannel @@ -107,7 +108,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn onErrorOption, defaultTimezone, client.getParameterProvider().getBlobFormatVersion(), - null); + null /* offsetTokenVerificationFunction */, + null /* parquetWriterVersion */); } /** Default constructor */ @@ -125,7 +127,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, Constants.BdecVersion bdecVersion, - OffsetTokenVerificationFunction offsetTokenVerificationFunction) { + OffsetTokenVerificationFunction offsetTokenVerificationFunction, + ParquetProperties.WriterVersion parquetWriterVersion) { this.isClosed = false; this.owningClient = client; @@ -141,7 +144,16 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this.memoryInfoProvider = MemoryInfoProviderFromRuntime.getInstance(); this.channelFlushContext = new ChannelFlushContext( - name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId); + name, + dbName, + schemaName, + tableName, + channelSequencer, + encryptionKey, + encryptionKeyId, + parquetWriterVersion == null + ? ParquetProperties.DEFAULT_WRITER_VERSION + : parquetWriterVersion); this.channelState = new ChannelRuntimeState(endOffsetToken, rowSequencer, true); this.rowBuffer = AbstractRowBuffer.createRowBuffer( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index fd1c0e38a..c57745c02 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -386,6 +386,8 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setOnErrorOption(request.getOnErrorOption()) .setDefaultTimezone(request.getDefaultTimezone()) .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) + .setParquetWriterVersion( + response.getIcebergSerializationPolicy().getParquetWriterVersion()) .build(); // Setup the row buffer schema diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index aeb3682d2..148339ea4 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.utils; import java.util.Arrays; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** Contains all the constants needed for Streaming Ingest */ @@ -71,11 +72,47 @@ public class Constants { public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; - public static final int PARQUET_MAJOR_VERSION = 1; public static final int PARQUET_MINOR_VERSION = 0; - // Length of the magic bytes "PAR1" or "PARE" in Parquet file - public static final int PARQUET_MAGIC_BYTES_LENGTH = 4; + /** + * Iceberg table serialization policy. Use v2 parquet writer for optimized serialization, + * otherwise v1. + */ + public enum IcebergSerializationPolicy { + NON_ICEBERG, + COMPATIBLE, + OPTIMIZED; + + public static IcebergSerializationPolicy fromName(String name) { + if (name == null) { + return NON_ICEBERG; + } + for (IcebergSerializationPolicy e : IcebergSerializationPolicy.values()) { + if (e.name().equalsIgnoreCase(name)) { + return e; + } + } + throw new IllegalArgumentException( + String.format( + "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", + name, Arrays.asList(IcebergSerializationPolicy.values()))); + } + + public ParquetProperties.WriterVersion getParquetWriterVersion() { + switch (this) { + case NON_ICEBERG: + case COMPATIBLE: + return ParquetProperties.WriterVersion.PARQUET_1_0; + case OPTIMIZED: + return ParquetProperties.WriterVersion.PARQUET_2_0; + default: + throw new IllegalArgumentException( + String.format( + "Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s", + this.name(), Arrays.asList(IcebergSerializationPolicy.values()))); + } + } + } public enum WriteMode { CLOUD_STORAGE, diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 0ff0bc30d..675f07dae 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -10,7 +10,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.parquet.column.ParquetProperties; /** Utility class to provide configurable constants */ public class ParameterProvider { @@ -48,9 +47,8 @@ public class ParameterProvider { public static final String ENABLE_NEW_JSON_PARSING_LOGIC = "ENABLE_NEW_JSON_PARSING_LOGIC".toLowerCase(); - public static final String PARQUET_WRITER_VERSION = "PARQUET_WRITER_VERSION".toLowerCase(); - public static final String ENABLE_PARQUET_DICTIONARY_ENCODING = - "ENABLE_PARQUET_DICTIONARY_ENCODING".toLowerCase(); + public static final String ENABLE_DISTINCT_VALUES_COUNT = + "ENABLE_DISTINCT_VALUES_COUNT".toLowerCase(); // Default values public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; @@ -79,11 +77,6 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; - public static final ParquetProperties.WriterVersion PARQUET_WRITER_VERSION_DEFAULT = - ParquetProperties.WriterVersion.PARQUET_1_0; - - public static final boolean ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT = false; - /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; @@ -91,6 +84,8 @@ public class ParameterProvider { public static final boolean IS_ICEBERG_MODE_DEFAULT = false; + public static final boolean ENABLE_DISTINCT_VALUES_COUNT_DEFAULT = false; + /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); @@ -167,73 +162,85 @@ private void setParameterMap( BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( INSERT_THROTTLE_INTERVAL_IN_MILLIS, INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( INSERT_THROTTLE_THRESHOLD_IN_BYTES, INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( ENABLE_SNOWPIPE_STREAMING_METRICS, SNOWPIPE_STREAMING_METRICS_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( - BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT, parameterOverrides, props, false); + BLOB_FORMAT_VERSION, + BLOB_FORMAT_VERSION_DEFAULT, + parameterOverrides, + props, + false /* enforceDefault */); getBlobFormatVersion(); // to verify parsing the configured value this.checkAndUpdate( - IO_TIME_CPU_RATIO, IO_TIME_CPU_RATIO_DEFAULT, parameterOverrides, props, false); + IO_TIME_CPU_RATIO, + IO_TIME_CPU_RATIO_DEFAULT, + parameterOverrides, + props, + false /* enforceDefault */); this.checkAndUpdate( BLOB_UPLOAD_MAX_RETRY_COUNT, BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( MAX_MEMORY_LIMIT_IN_BYTES, MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( - MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props, false); + MAX_CHUNK_SIZE_IN_BYTES, + MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, + parameterOverrides, + props, + false /* enforceDefault */); this.checkAndUpdate( MAX_CLIENT_LAG, isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( MAX_CHUNKS_IN_BLOB, @@ -247,31 +254,28 @@ private void setParameterMap( MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( ENABLE_NEW_JSON_PARSING_LOGIC, ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, parameterOverrides, props, - false); - - this.checkAndUpdate( - PARQUET_WRITER_VERSION, PARQUET_WRITER_VERSION_DEFAULT, parameterOverrides, props, false); + false /* enforceDefault */); this.checkAndUpdate( - ENABLE_PARQUET_DICTIONARY_ENCODING, - ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT, + ENABLE_DISTINCT_VALUES_COUNT, + ENABLE_DISTINCT_VALUES_COUNT_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); if (getMaxChunksInBlob() > getMaxChunksInRegistrationRequest()) { throw new IllegalArgumentException( @@ -507,21 +511,11 @@ public boolean isEnableNewJsonParsingLogic() { return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } - /** @return Parquet writer version */ - public ParquetProperties.WriterVersion getParquetWriterVersion() { - Object val = - this.parameterMap.getOrDefault(PARQUET_WRITER_VERSION, PARQUET_WRITER_VERSION_DEFAULT); - if (val instanceof ParquetProperties.WriterVersion) { - return (ParquetProperties.WriterVersion) val; - } - return ParquetProperties.WriterVersion.fromString((String) val); - } - - /** @return Whether to enable dictionary encoding in parquet */ - public boolean isEnableParquetDictionaryEncoding() { + /** @return Whether to enable distinct values count */ + public boolean isEnableDistinctValuesCount() { Object val = this.parameterMap.getOrDefault( - ENABLE_PARQUET_DICTIONARY_ENCODING, ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT); + ENABLE_DISTINCT_VALUES_COUNT, ENABLE_DISTINCT_VALUES_COUNT_DEFAULT); return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index 974f6fa8c..becbfb46a 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -8,6 +8,7 @@ import com.codahale.metrics.Timer; import io.netty.util.internal.PlatformDependent; +import java.io.IOException; import java.io.StringReader; import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; @@ -29,6 +30,8 @@ import java.util.Properties; import net.snowflake.client.core.SFSessionProperty; import org.apache.commons.codec.binary.Base64; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; import org.bouncycastle.openssl.PEMParser; import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; @@ -432,16 +435,22 @@ public static String concatDotPath(String... path) { } /** - * Get the little endian int from a byte array with offset + * Get the extended metadata size (footer size) from a parquet file * - * @param bytes the byte array - * @param offset the offset - * @return the little endian int + * @param bytes the serialized parquet file + * @param length the length of the byte array without padding + * @return the extended metadata size */ - public static int getLittleEndianInt(byte[] bytes, int offset) { - return (bytes[offset] & 0xFF) - | ((bytes[offset + 1] & 0xFF) << 8) - | ((bytes[offset + 2] & 0xFF) << 16) - | ((bytes[offset + 3] & 0xFF) << 24); + public static int getExtendedMetadataSize(byte[] bytes, int length) throws IOException { + final int magicOffset = length - ParquetFileWriter.MAGIC.length; + final int footerSizeOffset = magicOffset - Integer.BYTES; + if (bytes.length < length + || footerSizeOffset < 0 + || !ParquetFileWriter.MAGIC_STR.equals( + new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) { + throw new IllegalArgumentException("Invalid parquet file"); + } + + return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset); } } diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index 2bc69c4b4..c3126847d 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -43,11 +43,12 @@ public class BdecParquetWriter implements AutoCloseable { // Optional cap on the max number of row groups to allow per file, if this is exceeded we'll end // up throwing private final Optional maxRowGroups; - private long rowsWritten = 0; private final ParquetProperties.WriterVersion writerVersion; private final boolean enableDictionaryEncoding; + private long rowsWritten = 0; + /** * Creates a BDEC specific parquet writer. * diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 09f14056f..e5d96f6b8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -13,6 +13,7 @@ import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Pair; +import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetWriter; @@ -39,6 +40,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, + new ParameterProvider(isIceberg), new InternalParameterProvider(isIceberg)); // Construction fails if metadata contains 0 rows and data 1 row @@ -47,6 +49,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(0)), Constants.BdecVersion.THREE, + new ParameterProvider(isIceberg), new InternalParameterProvider(isIceberg)); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -75,7 +78,6 @@ private List> createChannelDataPerTable(int metada 100L, isIceberg ? Optional.of(1) : Optional.empty(), Constants.BdecParquetCompression.GZIP, - ParquetProperties.WriterVersion.PARQUET_1_0, false)) .when(channelData) .createFlusher(); @@ -104,9 +106,23 @@ private List> createChannelDataPerTable(int metada channelData .getColumnEps() .putIfAbsent( - columnName, new RowBufferStats(columnName, null, 1, isIceberg ? 0 : null, isIceberg)); + columnName, + new RowBufferStats( + columnName, + null, + 1, + isIceberg ? 0 : null, + ParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT)); channelData.setChannelContext( - new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L)); + new ChannelFlushContext( + "channel1", + "DB", + "SCHEMA", + "TABLE", + 1L, + "enc", + 1L, + ParquetProperties.DEFAULT_WRITER_VERSION)); return Collections.singletonList(channelData); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java index 5b545d697..8de3d6f84 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java @@ -8,13 +8,22 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class ChannelDataTest { + @Parameterized.Parameters(name = "enableDistinctValues: {0}") + public static Object[] enableDistinctValues() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean enableDistinctValues; @Test public void testGetCombinedColumnStatsMapNulls() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableDistinctValues); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); @@ -43,12 +52,12 @@ public void testGetCombinedColumnStatsMapNulls() { @Test public void testGetCombinedColumnStatsMapMissingColumn() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableDistinctValues); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); Map right = new HashMap<>(); - RowBufferStats rightStats1 = new RowBufferStats("COL1"); + RowBufferStats rightStats1 = new RowBufferStats("COL1", enableDistinctValues); right.put("foo", rightStats1); rightStats1.addIntValue(new BigInteger("11")); @@ -78,10 +87,10 @@ public void testGetCombinedColumnStatsMap() { Map left = new HashMap<>(); Map right = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); - RowBufferStats rightStats1 = new RowBufferStats("COL1"); - RowBufferStats leftStats2 = new RowBufferStats("COL1"); - RowBufferStats rightStats2 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats rightStats1 = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats leftStats2 = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats rightStats2 = new RowBufferStats("COL1", enableDistinctValues); left.put("one", leftStats1); left.put("two", leftStats2); @@ -107,7 +116,7 @@ public void testGetCombinedColumnStatsMap() { Assert.assertEquals(new BigInteger("10"), oneCombined.getCurrentMinIntValue()); Assert.assertEquals(new BigInteger("17"), oneCombined.getCurrentMaxIntValue()); - Assert.assertEquals(-1, oneCombined.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 5 : -1, oneCombined.getDistinctValues()); Assert.assertNull(oneCombined.getCurrentMinStrValue()); Assert.assertNull(oneCombined.getCurrentMaxStrValue()); Assert.assertNull(oneCombined.getCurrentMinRealValue()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index cd7354f09..9b777dc90 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -301,6 +301,7 @@ SnowflakeStreamingIngestChannelInternal>> createChannel( onErrorOption, defaultTimezone, Constants.BdecVersion.THREE, + null, null); } @@ -877,8 +878,10 @@ public void testBuildAndUpload() throws Exception { Map eps1 = new HashMap<>(); Map eps2 = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats("COL1"); - RowBufferStats stats2 = new RowBufferStats("COL1"); + RowBufferStats stats1 = + new RowBufferStats("COL1", ParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT); + RowBufferStats stats2 = + new RowBufferStats("COL1", ParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT); eps1.put("one", stats1); eps2.put("one", stats2); @@ -910,7 +913,7 @@ public void testBuildAndUpload() throws Exception { EpInfo expectedChunkEpInfo = AbstractRowBuffer.buildEpInfoFromStats( - 3, ChannelData.getCombinedColumnStatsMap(eps1, eps2), !isIcebergMode); + 3, ChannelData.getCombinedColumnStatsMap(eps1, eps2), !isIcebergMode, isIcebergMode); ChannelMetadata expectedChannel1Metadata = ChannelMetadata.builder() @@ -1115,13 +1118,13 @@ public void testBlobBuilder() throws Exception { Map eps1 = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats("COL1"); + RowBufferStats stats1 = new RowBufferStats("COL1", isIcebergMode); eps1.put("one", stats1); stats1.addIntValue(new BigInteger("10")); stats1.addIntValue(new BigInteger("15")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1, !isIcebergMode); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1, !isIcebergMode, isIcebergMode); ChannelMetadata channelMetadata = ChannelMetadata.builder() diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index 007dc3e23..e617d83d4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -38,7 +38,7 @@ public void parseValueBoolean() { Type type = Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).named("BOOLEAN_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -61,7 +61,7 @@ public void parseValueBoolean() { public void parseValueInt() { Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).named("INT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("INT_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("INT_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -88,7 +88,7 @@ public void parseValueDecimalToInt() { .as(LogicalTypeAnnotation.decimalType(4, 9)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -115,7 +115,7 @@ public void parseValueDateToInt() { .as(LogicalTypeAnnotation.dateType()) .named("DATE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -139,7 +139,7 @@ public void parseValueDateToInt() { public void parseValueLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).named("LONG_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -166,7 +166,7 @@ public void parseValueDecimalToLong() { .as(LogicalTypeAnnotation.decimalType(9, 18)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -193,7 +193,7 @@ public void parseValueTimeToLong() { .as(LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIME_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -220,7 +220,7 @@ public void parseValueTimestampToLong() { .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -247,7 +247,7 @@ public void parseValueTimestampTZToLong() { .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_TZ_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -271,7 +271,7 @@ public void parseValueTimestampTZToLong() { public void parseValueFloat() { Type type = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).named("FLOAT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -295,7 +295,7 @@ public void parseValueFloat() { public void parseValueDouble() { Type type = Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).named("DOUBLE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -319,7 +319,7 @@ public void parseValueDouble() { public void parseValueBinary() { Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -347,7 +347,7 @@ public void parseValueStringToBinary() { .as(LogicalTypeAnnotation.stringType()) .named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -377,7 +377,7 @@ public void parseValueFixed() { .length(4) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -406,7 +406,7 @@ public void parseValueDecimalToFixed() { .as(LogicalTypeAnnotation.decimalType(10, 20)) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true); Map rowBufferStatsMap = new HashMap() { { @@ -433,7 +433,7 @@ public void parseList() throws JsonProcessingException { Types.optionalList() .element(Types.optional(PrimitiveTypeName.INT32).named("element")) .named("LIST_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element"); + RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element", true); Map rowBufferStatsMap = new HashMap() { { @@ -500,8 +500,8 @@ public void parseMap() throws JsonProcessingException { .key(Types.required(PrimitiveTypeName.INT32).named("key")) .value(Types.optional(PrimitiveTypeName.INT32).named("value")) .named("MAP_COL"); - RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key"); - RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value"); + RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key", true); + RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value", true); Map rowBufferStatsMap = new HashMap() { { @@ -592,8 +592,8 @@ public void parseStruct() throws JsonProcessingException { .named("b")) .named("STRUCT_COL"); - RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a"); - RowBufferStats rowBufferBStats = new RowBufferStats("STRUCT_COL.b"); + RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a", true); + RowBufferStats rowBufferBStats = new RowBufferStats("STRUCT_COL.b", true); Map rowBufferStatsMap = new HashMap() { { @@ -703,7 +703,7 @@ public void parseNestedTypes() { private static Type generateNestedTypeAndStats( int depth, String name, Map rowBufferStatsMap, String path) { if (depth == 0) { - rowBufferStatsMap.put(path, new RowBufferStats(path)); + rowBufferStatsMap.put(path, new RowBufferStats(path, true)); return Types.optional(PrimitiveTypeName.INT32).named(name); } switch (depth % 3) { @@ -718,7 +718,8 @@ private static Type generateNestedTypeAndStats( .addField(generateNestedTypeAndStats(depth - 1, "a", rowBufferStatsMap, path + ".a")) .named(name); case 0: - rowBufferStatsMap.put(path + ".key_value.key", new RowBufferStats(path + ".key_value.key")); + rowBufferStatsMap.put( + path + ".key_value.key", new RowBufferStats(path + ".key_value.key", true)); return Types.optionalMap() .key(Types.required(PrimitiveTypeName.INT32).named("key")) .value( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java index 9f2e848f3..06ca3b96c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java @@ -4,12 +4,22 @@ import java.nio.charset.StandardCharsets; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RowBufferStatsTest { + @Parameterized.Parameters(name = "enableDistinctValues: {0}") + public static Object[] enableDistinctValues() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean enableDistinctValues; + @Test public void testEmptyState() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); Assert.assertNull(stats.getCollationDefinitionString()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -20,12 +30,12 @@ public void testEmptyState() throws Exception { Assert.assertNull(stats.getCurrentMaxIntValue()); Assert.assertEquals(0, stats.getCurrentNullCount()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 0 : -1, stats.getDistinctValues()); } @Test public void testMinMaxStrNonCol() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); stats.addStrValue("bob"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); @@ -55,22 +65,22 @@ public void testMinMaxStrNonCol() throws Exception { @Test public void testMinMaxInt() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); stats.addIntValue(BigInteger.valueOf(5)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 1 : -1, stats.getDistinctValues()); stats.addIntValue(BigInteger.valueOf(6)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 2 : -1, stats.getDistinctValues()); stats.addIntValue(BigInteger.valueOf(4)); Assert.assertEquals(BigInteger.valueOf((4)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 3 : -1, stats.getDistinctValues()); Assert.assertNull(stats.getCurrentMinRealValue()); Assert.assertNull(stats.getCurrentMaxRealValue()); @@ -82,22 +92,22 @@ public void testMinMaxInt() throws Exception { @Test public void testMinMaxReal() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); stats.addRealValue(1.0); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 1 : -1, stats.getDistinctValues()); stats.addRealValue(1.5); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 2 : -1, stats.getDistinctValues()); stats.addRealValue(.8); Assert.assertEquals(Double.valueOf(.8), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 3 : -1, stats.getDistinctValues()); Assert.assertNull(stats.getCurrentMinIntValue()); Assert.assertNull(stats.getCurrentMaxIntValue()); @@ -109,7 +119,7 @@ public void testMinMaxReal() throws Exception { @Test public void testIncCurrentNullCount() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); Assert.assertEquals(0, stats.getCurrentNullCount()); stats.incCurrentNullCount(); @@ -120,7 +130,7 @@ public void testIncCurrentNullCount() throws Exception { @Test public void testMaxLength() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableDistinctValues); Assert.assertEquals(0, stats.getCurrentMaxLength()); stats.setCurrentMaxLength(100L); @@ -132,8 +142,8 @@ public void testMaxLength() throws Exception { @Test public void testGetCombinedStats() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1"); - RowBufferStats two = new RowBufferStats("COL1"); + RowBufferStats one = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats two = new RowBufferStats("COL1", enableDistinctValues); one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -150,7 +160,7 @@ public void testGetCombinedStats() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(1), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 7 : -1, result.getDistinctValues()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -159,8 +169,8 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableDistinctValues); + two = new RowBufferStats("COL1", enableDistinctValues); one.addRealValue(2d); one.addRealValue(4d); @@ -175,7 +185,7 @@ public void testGetCombinedStats() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(1), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 7 : -1, result.getDistinctValues()); Assert.assertEquals(0, result.getCurrentNullCount()); Assert.assertNull(result.getCollationDefinitionString()); @@ -185,8 +195,8 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings without collation - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableDistinctValues); + two = new RowBufferStats("COL1", enableDistinctValues); one.addStrValue("alpha"); one.addStrValue("d"); @@ -218,8 +228,8 @@ public void testGetCombinedStats() throws Exception { @Test public void testGetCombinedStatsNull() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1"); - RowBufferStats two = new RowBufferStats("COL1"); + RowBufferStats one = new RowBufferStats("COL1", enableDistinctValues); + RowBufferStats two = new RowBufferStats("COL1", enableDistinctValues); one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -231,7 +241,7 @@ public void testGetCombinedStatsNull() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(2), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 4 : -1, result.getDistinctValues()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -240,7 +250,7 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableDistinctValues); one.addRealValue(2d); one.addRealValue(4d); @@ -250,7 +260,7 @@ public void testGetCombinedStatsNull() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(2), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertEquals(enableDistinctValues ? 4 : -1, result.getDistinctValues()); Assert.assertEquals(0, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -259,8 +269,8 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableDistinctValues); + two = new RowBufferStats("COL1", enableDistinctValues); one.addStrValue("alpha"); one.addStrValue("d"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 8b5ad153d..0d610c615 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -26,6 +26,7 @@ import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.StringUtils; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.BdecParquetReader; import org.junit.Assert; import org.junit.Before; @@ -149,8 +150,7 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o Constants.BdecParquetCompression.GZIP, ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, isIcebergMode ? Optional.of(1) : Optional.empty(), - PARQUET_WRITER_VERSION_DEFAULT, - ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT, + isIcebergMode, isIcebergMode), null, null); @@ -563,12 +563,12 @@ private void testDoubleQuotesColumnNameHelper(OpenChannelRequest.OnErrorOption o public void testBuildEpInfoFromStats() { Map colStats = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats("intColumn"); + RowBufferStats stats1 = new RowBufferStats("intColumn", isIcebergMode); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); stats1.addIntValue(BigInteger.valueOf(1)); - RowBufferStats stats2 = new RowBufferStats("strColumn"); + RowBufferStats stats2 = new RowBufferStats("strColumn", isIcebergMode); stats2.addStrValue("alice"); stats2.addStrValue("bob"); stats2.incCurrentNullCount(); @@ -576,7 +576,8 @@ public void testBuildEpInfoFromStats() { colStats.put("intColumn", stats1); colStats.put("strColumn", stats2); - EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode); + EpInfo result = + AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode, isIcebergMode); Map columnResults = result.getColumnEps(); Assert.assertEquals(2, columnResults.keySet().size()); @@ -591,7 +592,7 @@ public void testBuildEpInfoFromStats() { Assert.assertEquals(1, strColumnResult.getNullCount()); FileColumnProperties intColumnResult = columnResults.get("intColumn"); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 3 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals(BigInteger.valueOf(1), intColumnResult.getMinIntValue()); Assert.assertEquals(BigInteger.valueOf(10), intColumnResult.getMaxIntValue()); Assert.assertEquals(0, intColumnResult.getNullCount()); @@ -603,20 +604,21 @@ public void testBuildEpInfoFromNullColumnStats() { final String realColName = "realCol"; Map colStats = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats(intColName); - RowBufferStats stats2 = new RowBufferStats(realColName); + RowBufferStats stats1 = new RowBufferStats(intColName, isIcebergMode); + RowBufferStats stats2 = new RowBufferStats(realColName, isIcebergMode); stats1.incCurrentNullCount(); stats2.incCurrentNullCount(); colStats.put(intColName, stats1); colStats.put(realColName, stats2); - EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode); + EpInfo result = + AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode, isIcebergMode); Map columnResults = result.getColumnEps(); Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties intColumnResult = columnResults.get(intColName); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 0 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals( isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue()); @@ -627,7 +629,7 @@ public void testBuildEpInfoFromNullColumnStats() { Assert.assertEquals(0, intColumnResult.getMaxLength()); FileColumnProperties realColumnResult = columnResults.get(realColName); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 0 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals( isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue()); @@ -642,12 +644,12 @@ public void testBuildEpInfoFromNullColumnStats() { public void testInvalidEPInfo() { Map colStats = new HashMap<>(); - RowBufferStats stats1 = new RowBufferStats("intColumn"); + RowBufferStats stats1 = new RowBufferStats("intColumn", isIcebergMode); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); stats1.addIntValue(BigInteger.valueOf(1)); - RowBufferStats stats2 = new RowBufferStats("strColumn"); + RowBufferStats stats2 = new RowBufferStats("strColumn", isIcebergMode); stats2.addStrValue("alice"); stats2.incCurrentNullCount(); stats2.incCurrentNullCount(); @@ -656,7 +658,7 @@ public void testInvalidEPInfo() { colStats.put("strColumn", stats2); try { - AbstractRowBuffer.buildEpInfoFromStats(1, colStats, !isIcebergMode); + AbstractRowBuffer.buildEpInfoFromStats(1, colStats, !isIcebergMode, isIcebergMode); fail("should fail when row count is smaller than null count."); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -769,33 +771,36 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( BigInteger.valueOf(10), columnEpStats.get("colTinyInt").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("colTinyInt").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("colTinyInt").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 2 : -1, columnEpStats.get("colTinyInt").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(1), columnEpStats.get("COLTINYINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(1), columnEpStats.get("COLTINYINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLTINYINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLTINYINT").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 1 : -1, columnEpStats.get("COLTINYINT").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(3), columnEpStats.get("COLSMALLINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(2), columnEpStats.get("COLSMALLINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLSMALLINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLSMALLINT").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 2 : -1, columnEpStats.get("COLSMALLINT").getDistinctValues()); Assert.assertEquals(BigInteger.valueOf(3), columnEpStats.get("COLINT").getCurrentMaxIntValue()); Assert.assertEquals(BigInteger.valueOf(3), columnEpStats.get("COLINT").getCurrentMinIntValue()); Assert.assertEquals(1L, columnEpStats.get("COLINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLINT").getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 1 : -1, columnEpStats.get("COLINT").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(40), columnEpStats.get("COLBIGINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(4), columnEpStats.get("COLBIGINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLBIGINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLBIGINT").getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 2 : -1, columnEpStats.get("COLBIGINT").getDistinctValues()); Assert.assertArrayEquals( "2".getBytes(StandardCharsets.UTF_8), columnEpStats.get("COLCHAR").getCurrentMinStrValue()); @@ -1908,7 +1913,16 @@ public void testParquetFileNameMetadata() throws IOException { bufferUnderTest.setupSchema(Collections.singletonList(colChar)); loadData(bufferUnderTest, Collections.singletonMap("colChar", "a")); ChannelData data = bufferUnderTest.flush(); - data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L)); + data.setChannelContext( + new ChannelFlushContext( + "name", + "db", + "schema", + "table", + 1L, + "key", + 0L, + ParquetProperties.DEFAULT_WRITER_VERSION)); ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher(); Flusher.SerializationResult result = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java index 64db57c31..1397acfd7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java @@ -29,7 +29,7 @@ public void parseValueFixedSB1ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 12, @@ -61,7 +61,7 @@ public void parseValueFixedSB2ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 1234, @@ -93,7 +93,7 @@ public void parseValueFixedSB4ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789, @@ -125,7 +125,7 @@ public void parseValueFixedSB8ToInt64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789987654321L, @@ -157,7 +157,7 @@ public void parseValueFixedSB16ToByteArray() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("91234567899876543219876543211234567891"), @@ -191,7 +191,7 @@ public void parseValueFixedDecimalToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("12345.54321"), @@ -221,7 +221,7 @@ public void parseValueDouble() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 12345.54321d, @@ -251,7 +251,7 @@ public void parseValueBoolean() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( true, @@ -281,7 +281,7 @@ public void parseValueBinary() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "1234abcd".getBytes(), @@ -326,7 +326,7 @@ private void testJsonWithLogicalType(String logicalType, boolean enableNewJsonPa String var = "{\"key1\":-879869596,\"key2\":\"value2\",\"key3\":null," + "\"key4\":{\"key41\":0.032437,\"key42\":\"value42\",\"key43\":null}}"; - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( var, @@ -376,7 +376,7 @@ private void testNullJsonWithLogicalType(String var, boolean enableNewJsonParsin .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( var, @@ -417,7 +417,7 @@ public void parseValueArrayToBinaryInternal(boolean enableNewJsonParsingLogic) { input.put("b", "2"); input.put("c", "3"); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( input, @@ -455,7 +455,7 @@ public void parseValueTextToBinary() { String text = "This is a sample text! Length is bigger than 32 bytes :)"; - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( text, @@ -492,7 +492,7 @@ public void parseValueTimestampNtzSB4Error() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); SFException exception = Assert.assertThrows( SFException.class, @@ -520,7 +520,7 @@ public void parseValueTimestampNtzSB8ToINT64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2013-04-28T20:57:01.000", @@ -551,7 +551,7 @@ public void parseValueTimestampNtzSB16ToByteArray() { .scale(9) // nanos .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2022-09-18T22:05:07.123456789", @@ -583,7 +583,7 @@ public void parseValueDateToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2021-01-01", @@ -614,7 +614,7 @@ public void parseValueTimeSB4ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00", @@ -645,7 +645,7 @@ public void parseValueTimeSB8ToInt64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00.123", @@ -676,7 +676,7 @@ public void parseValueTimeSB16Error() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false); SFException exception = Assert.assertThrows( SFException.class, diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 0dbeeebee..ce38676ea 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -130,6 +130,7 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); channel2 = new SnowflakeStreamingIngestChannelInternal<>( @@ -146,6 +147,7 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); channel3 = new SnowflakeStreamingIngestChannelInternal<>( @@ -162,6 +164,7 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); channel4 = new SnowflakeStreamingIngestChannelInternal<>( @@ -178,6 +181,7 @@ public void setup() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); } @@ -375,6 +379,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); ChannelsStatusRequest.ChannelStatusRequestDTO dto = @@ -437,6 +442,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); try { @@ -490,6 +496,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); ChannelMetadata channelMetadata = @@ -500,8 +507,9 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { .build(); Map columnEps = new HashMap<>(); - columnEps.put("column", new RowBufferStats("COL1")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode); + columnEps.put("column", new RowBufferStats("COL1", isIcebergMode)); + EpInfo epInfo = + AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode, isIcebergMode); ChunkMetadata chunkMetadata = ChunkMetadata.builder() @@ -549,8 +557,9 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { private Pair, Set> getRetryBlobMetadata() { Map columnEps = new HashMap<>(); - columnEps.put("column", new RowBufferStats("COL1")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode); + columnEps.put("column", new RowBufferStats("COL1", isIcebergMode)); + EpInfo epInfo = + AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode, isIcebergMode); ChannelMetadata channelMetadata1 = ChannelMetadata.builder() @@ -1250,6 +1259,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { OpenChannelRequest.OnErrorOption.CONTINUE, ZoneOffset.UTC, BDEC_VERSION, + null, null); ChannelsStatusRequest.ChannelStatusRequestDTO dto =