From bef322d29c77b45d96b987d5342611382e043fa6 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Fri, 11 Oct 2024 16:14:19 -0700 Subject: [PATCH] done --- .../streaming/internal/AbstractRowBuffer.java | 14 +- .../streaming/internal/BlobBuilder.java | 11 +- .../streaming/internal/ChannelData.java | 2 +- .../internal/ClientBufferParameters.java | 42 +++- .../ingest/streaming/internal/EpInfo.java | 12 +- .../internal/FileColumnProperties.java | 14 ++ .../streaming/internal/FlushService.java | 6 +- .../internal/IcebergParquetValueParser.java | 29 ++- .../internal/InternalParameterProvider.java | 13 ++ .../streaming/internal/ParquetRowBuffer.java | 54 +++-- .../streaming/internal/RowBufferStats.java | 97 ++++++++- .../ingest/utils/ParameterProvider.java | 42 ++-- .../ingest/utils/SubColumnFinder.java | 69 +++++++ .../net/snowflake/ingest/utils/Utils.java | 25 ++- .../streaming/internal/BlobBuilderTest.java | 6 +- .../streaming/internal/ChannelDataTest.java | 45 +++- .../internal/FileColumnPropertiesTest.java | 12 +- .../streaming/internal/FlushServiceTest.java | 22 +- .../IcebergParquetValueParserTest.java | 147 ++++++++----- .../internal/RowBufferStatsTest.java | 194 ++++++++++++++---- .../streaming/internal/RowBufferTest.java | 70 +++++-- .../SnowflakeParquetValueParserTest.java | 44 ++-- .../SnowflakeStreamingIngestClientTest.java | 16 +- .../ingest/utils/SubColumnFinderTest.java | 140 +++++++++++++ 24 files changed, 911 insertions(+), 215 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java create mode 100644 src/test/java/net/snowflake/ingest/utils/SubColumnFinderTest.java diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index fa502f30a..8e16b734d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -642,13 +642,17 @@ public synchronized void close(String name) { * * @param rowCount: count of rows in the given buffer * @param colStats: map of column name to RowBufferStats - * @param setAllDefaultValues: whether to set default values for all null fields the EPs - * irrespective of the data type of this column + * @param setAllDefaultValues: whether to set default values for all null min/max field in the EPs + * @param enableDistinctValuesCount: whether to include valid NDV in the EPs irrespective of the + * data type of this column * @return the EPs built from column stats */ static EpInfo buildEpInfoFromStats( - long rowCount, Map colStats, boolean setAllDefaultValues) { - EpInfo epInfo = new EpInfo(rowCount, new HashMap<>()); + long rowCount, + Map colStats, + boolean setAllDefaultValues, + boolean enableDistinctValuesCount) { + EpInfo epInfo = new EpInfo(rowCount, new HashMap<>(), enableDistinctValuesCount); for (Map.Entry colStat : colStats.entrySet()) { RowBufferStats stat = colStat.getValue(); FileColumnProperties dto = new FileColumnProperties(stat, setAllDefaultValues); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 3e1de452a..73774b144 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -31,6 +31,7 @@ import net.snowflake.ingest.utils.Cryptor; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; +import net.snowflake.ingest.utils.Utils; import org.apache.commons.codec.binary.Hex; import org.apache.parquet.hadoop.ParquetFileWriter; @@ -90,6 +91,7 @@ static Blob constructBlobAndMetadata( final byte[] compressedChunkData; final int chunkLength; final int compressedChunkDataSize; + int extendedMetadataSize = -1; if (internalParameterProvider.getEnableChunkEncryption()) { Pair paddedChunk = @@ -111,6 +113,10 @@ static Blob constructBlobAndMetadata( compressedChunkData = serializedChunk.chunkData.toByteArray(); chunkLength = compressedChunkData.length; compressedChunkDataSize = chunkLength; + + if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { + extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength); + } } // Compute the md5 of the chunk data @@ -135,7 +141,8 @@ static Blob constructBlobAndMetadata( AbstractRowBuffer.buildEpInfoFromStats( serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined, - internalParameterProvider.setAllDefaultValuesInEp())) + internalParameterProvider.setAllDefaultValuesInEp(), + internalParameterProvider.isEnableDistinctValuesCount())) .setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst()) .setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond()); @@ -145,7 +152,7 @@ static Blob constructBlobAndMetadata( .setMinorVersion(Constants.PARQUET_MINOR_VERSION) // set createdOn in seconds .setCreatedOn(System.currentTimeMillis() / 1000) - .setExtendedMetadataSize(-1L); + .setExtendedMetadataSize((long) extendedMetadataSize); } ChunkMetadata chunkMetadata = chunkMetadataBuilder.build(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelData.java index 81f49d2fd..3ad8855cc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelData.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelData.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 9009642b3..c4cb56534 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -25,6 +25,10 @@ public class ClientBufferParameters { private boolean isIcebergMode; + private boolean enableDistinctValuesCount; + + private boolean enableValuesCount; + /** * Private constructor used for test methods * @@ -38,13 +42,17 @@ private ClientBufferParameters( Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic, Optional maxRowGroups, - boolean isIcebergMode) { + boolean isIcebergMode, + boolean enableDistinctValuesCount, + boolean enableValuesCount) { this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; this.maxRowGroups = maxRowGroups; this.isIcebergMode = isIcebergMode; + this.enableDistinctValuesCount = enableDistinctValuesCount; + this.enableValuesCount = enableValuesCount; } /** @param clientInternal reference to the client object where the relevant parameters are set */ @@ -65,14 +73,22 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic() : ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; - this.isIcebergMode = - clientInternal != null - ? clientInternal.isIcebergMode() - : ParameterProvider.IS_ICEBERG_MODE_DEFAULT; this.maxRowGroups = isIcebergMode ? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT) : Optional.empty(); + this.isIcebergMode = + clientInternal != null + ? clientInternal.isIcebergMode() + : ParameterProvider.IS_ICEBERG_MODE_DEFAULT; + this.enableDistinctValuesCount = + clientInternal != null + ? clientInternal.getInternalParameterProvider().isEnableDistinctValuesCount() + : InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT; + this.enableValuesCount = + clientInternal != null + ? clientInternal.getInternalParameterProvider().isEnableValuesCount() + : InternalParameterProvider.ENABLE_VALUES_COUNT_DEFAULT; } /** @@ -87,14 +103,18 @@ public static ClientBufferParameters test_createClientBufferParameters( Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic, Optional maxRowGroups, - boolean isIcebergMode) { + boolean isIcebergMode, + boolean enableDistinctValuesCount, + boolean enableValuesCount) { return new ClientBufferParameters( maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, enableNewJsonParsingLogic, maxRowGroups, - isIcebergMode); + isIcebergMode, + enableDistinctValuesCount, + enableValuesCount); } public long getMaxChunkSizeInBytes() { @@ -125,6 +145,14 @@ public String getParquetMessageTypeName() { return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME; } + public boolean isEnableDistinctValuesCount() { + return enableDistinctValuesCount; + } + + public boolean isEnableValuesCount() { + return enableDistinctValuesCount; + } + public boolean isEnableDictionaryEncoding() { return isIcebergMode; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java b/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java index e6e6a4d9d..28203cfd5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java @@ -13,12 +13,18 @@ class EpInfo { private Map columnEps; + private boolean enableDistinctValuesCount; + /** Default constructor, needed for Jackson */ EpInfo() {} - EpInfo(long rowCount, Map columnEps) { + EpInfo( + long rowCount, + Map columnEps, + boolean enableDistinctValuesCount) { this.rowCount = rowCount; this.columnEps = columnEps; + this.enableDistinctValuesCount = enableDistinctValuesCount; } /** Some basic verification logic to make sure the EP info is correct */ @@ -35,8 +41,8 @@ public void verifyEpInfo() { colName, colEp.getNullCount(), rowCount)); } - // Make sure the NDV should always be -1 - if (colEp.getDistinctValues() != EP_NDV_UNKNOWN) { + // Make sure the NDV should always be -1 when the NDV set to default + if (!enableDistinctValuesCount && colEp.getDistinctValues() != EP_NDV_UNKNOWN) { throw new SFException( ErrorCode.INTERNAL_ERROR, String.format( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java index b3c7aedf5..fcb758e31 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java @@ -44,6 +44,9 @@ class FileColumnProperties { private long nullCount; + // for elements in repeated columns + private Long numberOfValues; + // for binary or string columns private long maxLength; @@ -110,6 +113,7 @@ class FileColumnProperties { this.setMinStrNonCollated(null); this.setNullCount(stats.getCurrentNullCount()); this.setDistinctValues(stats.getDistinctValues()); + this.setNumberOfValues(stats.getNumberOfValues()); } private void setIntValues(RowBufferStats stats) { @@ -284,6 +288,16 @@ void setMaxStrNonCollated(String maxStrNonCollated) { this.maxStrNonCollated = maxStrNonCollated; } + @JsonProperty("numberOfValues") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + Long getNumberOfValues() { + return numberOfValues; + } + + void setNumberOfValues(Long numberOfValues) { + this.numberOfValues = numberOfValues; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("{"); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 2e64f77b8..d11762340 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -597,11 +597,13 @@ BlobMetadata buildAndUpload( InvalidKeyException { Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency); - InternalParameterProvider paramProvider = this.owningClient.getInternalParameterProvider(); // Construct the blob along with the metadata of the blob BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata( - blobPath.fileName, blobData, bdecVersion, paramProvider); + blobPath.fileName, + blobData, + bdecVersion, + this.owningClient.getInternalParameterProvider()); blob.blobStats.setBuildDurationMs(buildContext); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java index 963dbf188..7c2f90710 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -25,6 +25,7 @@ import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.SubColumnFinder; import net.snowflake.ingest.utils.Utils; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -49,6 +50,7 @@ class IcebergParquetValueParser { * @param value column value provided by user in a row * @param type Parquet column type * @param statsMap column stats map to update + * @param subColumnFinder helper class to find stats of sub-columns * @param defaultTimezone default timezone to use for timestamp parsing * @param insertRowsCurrIndex Row index corresponding the row to parse (w.r.t input rows in * insertRows API, and not buffered row) @@ -58,17 +60,19 @@ static ParquetBufferValue parseColumnValueToParquet( Object value, Type type, Map statsMap, + SubColumnFinder subColumnFinder, ZoneId defaultTimezone, long insertRowsCurrIndex) { Utils.assertNotNull("Parquet column stats map", statsMap); return parseColumnValueToParquet( - value, type, statsMap, defaultTimezone, insertRowsCurrIndex, null, false); + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, null, false); } private static ParquetBufferValue parseColumnValueToParquet( Object value, Type type, Map statsMap, + SubColumnFinder subColumnFinder, ZoneId defaultTimezone, long insertRowsCurrIndex, String path, @@ -152,6 +156,7 @@ private static ParquetBufferValue parseColumnValueToParquet( value, type.asGroupType(), statsMap, + subColumnFinder, defaultTimezone, insertRowsCurrIndex, path, @@ -164,9 +169,9 @@ private static ParquetBufferValue parseColumnValueToParquet( throw new SFException( ErrorCode.INVALID_FORMAT_ROW, path, "Passed null to non nullable field"); } - if (type.isPrimitive()) { - statsMap.get(path).incCurrentNullCount(); - } + subColumnFinder + .getSubColumns(path) + .forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount()); } return new ParquetBufferValue(value, estimatedParquetSize); @@ -366,6 +371,7 @@ private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) { * @param value value to parse * @param type Parquet column type * @param statsMap column stats map to update + * @param subColumnFinder helper class to find stats of sub-columns * @param defaultTimezone default timezone to use for timestamp parsing * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API * @param path dot path of the column @@ -376,6 +382,7 @@ private static ParquetBufferValue getGroupValue( Object value, GroupType type, Map statsMap, + SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, String path, @@ -386,16 +393,19 @@ private static ParquetBufferValue getGroupValue( value, type, statsMap, + subColumnFinder, defaultTimezone, insertRowsCurrIndex, path, isDescendantsOfRepeatingGroup); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { - return get3LevelListValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path); + return get3LevelListValue( + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, path); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { - return get3LevelMapValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path); + return get3LevelMapValue( + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, path); } throw new SFException( ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getClass().getSimpleName()); @@ -410,6 +420,7 @@ private static ParquetBufferValue getStructValue( Object value, GroupType type, Map statsMap, + SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, String path, @@ -425,6 +436,7 @@ private static ParquetBufferValue getStructValue( structVal.getOrDefault(type.getFieldName(i), null), type.getType(i), statsMap, + subColumnFinder, defaultTimezone, insertRowsCurrIndex, path, @@ -457,6 +469,7 @@ private static ParquetBufferValue get3LevelListValue( Object value, GroupType type, Map statsMap, + SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, String path) { @@ -471,6 +484,7 @@ private static ParquetBufferValue get3LevelListValue( val, type.getType(0).asGroupType().getType(0), statsMap, + subColumnFinder, defaultTimezone, insertRowsCurrIndex, listGroupPath, @@ -492,6 +506,7 @@ private static ParquetBufferValue get3LevelMapValue( Object value, GroupType type, Map statsMap, + SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, String path) { @@ -506,6 +521,7 @@ private static ParquetBufferValue get3LevelMapValue( entry.getKey(), type.getType(0).asGroupType().getType(0), statsMap, + subColumnFinder, defaultTimezone, insertRowsCurrIndex, mapGroupPath, @@ -515,6 +531,7 @@ private static ParquetBufferValue get3LevelMapValue( entry.getValue(), type.getType(0).asGroupType().getType(1), statsMap, + subColumnFinder, defaultTimezone, insertRowsCurrIndex, mapGroupPath, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java index 4ab8ecc4b..d6589bf93 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -7,6 +7,8 @@ /** A class to provide non-configurable constants depends on Iceberg or non-Iceberg mode */ class InternalParameterProvider { public static final Integer MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT = 1; + public static final boolean ENABLE_DISTINCT_VALUES_COUNT_DEFAULT = false; + public static final boolean ENABLE_VALUES_COUNT_DEFAULT = false; private final boolean isIcebergMode; @@ -33,4 +35,15 @@ boolean setIcebergSpecificFieldsInEp() { // in the EP metadata, createdOn, and extendedMetadataSize. return isIcebergMode; } + + boolean isEnableDistinctValuesCount() { + // When in Iceberg mode, we enabled distinct values count in EP metadata. + return isIcebergMode; + } + + boolean isEnableValuesCount() { + // When in Iceberg mode, we enabled values count in EP metadata for repeated group (e.g. map, + // list). + return isIcebergMode; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 5e3fa1191..b972404f7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -25,6 +25,7 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.SubColumnFinder; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.MessageType; @@ -49,6 +50,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { private final ParquetProperties.WriterVersion parquetWriterVersion; private MessageType schema; + private SubColumnFinder statsFinder; /** Construct a ParquetRowBuffer object. */ ParquetRowBuffer( @@ -113,7 +115,9 @@ public void setupSchema(List columns) { column.getCollation(), column.getOrdinal(), null /* fieldId */, - parquetType.isPrimitive() ? parquetType.asPrimitiveType() : null)); + parquetType.isPrimitive() ? parquetType.asPrimitiveType() : null, + clientBufferParameters.isEnableDistinctValuesCount(), + false /* enableValuesCount */)); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { @@ -128,7 +132,9 @@ public void setupSchema(List columns) { column.getCollation(), column.getOrdinal(), null /* fieldId */, - parquetType.isPrimitive() ? parquetType.asPrimitiveType() : null)); + parquetType.isPrimitive() ? parquetType.asPrimitiveType() : null, + clientBufferParameters.isEnableDistinctValuesCount(), + false /* enableValuesCount */)); } } @@ -186,31 +192,46 @@ public void setupSchema(List columns) { */ if (clientBufferParameters.getIsIcebergMode()) { for (ColumnDescriptor columnDescriptor : schema.getColumns()) { - String columnPath = concatDotPath(columnDescriptor.getPath()); + String[] path = columnDescriptor.getPath(); + String columnDotPath = concatDotPath(path); PrimitiveType primitiveType = columnDescriptor.getPrimitiveType(); + boolean isInRepeatedGroup = + path.length > 1 + && schema + .getType(Arrays.copyOf(path, path.length - 1)) + .isRepetition(Type.Repetition.REPEATED); /* set fieldId to 0 for non-structured columns */ - int fieldId = columnDescriptor.getPath().length == 1 ? 0 : primitiveType.getId().intValue(); + int fieldId = path.length == 1 ? 0 : primitiveType.getId().intValue(); int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue(); this.statsMap.put( - columnPath, + columnDotPath, new RowBufferStats( - columnPath, null /* collationDefinitionString */, ordinal, fieldId, primitiveType)); + columnDotPath, + null /* collationDefinitionString */, + ordinal, + fieldId, + primitiveType, + clientBufferParameters.isEnableDistinctValuesCount(), + clientBufferParameters.isEnableValuesCount() && isInRepeatedGroup)); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { this.tempStatsMap.put( - columnPath, + columnDotPath, new RowBufferStats( - columnPath, + columnDotPath, null /* collationDefinitionString */, ordinal, fieldId, - primitiveType)); + primitiveType, + clientBufferParameters.isEnableDistinctValuesCount(), + clientBufferParameters.isEnableValuesCount() && isInRepeatedGroup)); } } } + statsFinder = new SubColumnFinder(schema); tempData.clear(); data.clear(); } @@ -267,7 +288,7 @@ private float addRow( // Create new empty stats just for the current row. Map forkedStatsMap = new HashMap<>(); - statsMap.forEach((columnName, stats) -> forkedStatsMap.put(columnName, stats.forkEmpty())); + statsMap.forEach((columnPath, stats) -> forkedStatsMap.put(columnPath, stats.forkEmpty())); for (Map.Entry entry : row.entrySet()) { String key = entry.getKey(); @@ -279,7 +300,12 @@ private float addRow( ParquetBufferValue valueWithSize = (clientBufferParameters.getIsIcebergMode() ? IcebergParquetValueParser.parseColumnValueToParquet( - value, parquetColumn.type, forkedStatsMap, defaultTimezone, insertRowsCurrIndex) + value, + parquetColumn.type, + forkedStatsMap, + statsFinder, + defaultTimezone, + insertRowsCurrIndex) : SnowflakeParquetValueParser.parseColumnValueToParquet( value, column, @@ -313,9 +339,11 @@ private float addRow( RowBufferStats.getCombinedStats(statsMap.get(columnName), forkedColStats.getValue())); } - // Increment null count for column missing in the input map + // Increment null count for column and its sub-columns missing in the input map for (String columnName : Sets.difference(this.fieldIndex.keySet(), inputColumnNames)) { - statsMap.get(columnName).incCurrentNullCount(); + statsFinder + .getSubColumns(columnName) + .forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount()); } return size; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java index 4d7781c78..131f603f0 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java @@ -8,7 +8,10 @@ import java.math.BigInteger; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.schema.PrimitiveType; @@ -42,26 +45,50 @@ class RowBufferStats { // for binary or string columns private long currentMaxLength; + private final boolean enableDistinctValuesCount; + private Set distinctValues; + private final boolean enableValuesCount; + private Long numberOfValues; + RowBufferStats( String columnDisplayName, String collationDefinitionString, int ordinal, Integer fieldId, - PrimitiveType primitiveType) { + PrimitiveType primitiveType, + boolean enableDistinctValuesCount, + boolean enableValuesCount) { this.columnDisplayName = columnDisplayName; this.collationDefinitionString = collationDefinitionString; this.ordinal = ordinal; this.fieldId = fieldId; this.primitiveType = primitiveType; + this.enableDistinctValuesCount = enableDistinctValuesCount; + this.enableValuesCount = enableValuesCount; + if (enableDistinctValuesCount) { + this.distinctValues = new HashSet<>(); + } reset(); } - RowBufferStats(String columnDisplayName) { - this(columnDisplayName, null, -1, null, null); + RowBufferStats( + String columnDisplayName, boolean enableDistinctValuesCount, boolean enableValuesCount) { + this(columnDisplayName, null, -1, null, null, enableDistinctValuesCount, enableValuesCount); } - RowBufferStats(String columnDisplayName, PrimitiveType primitiveType) { - this(columnDisplayName, null, -1, null, primitiveType); + RowBufferStats( + String columnDisplayName, + PrimitiveType primitiveType, + boolean enableDistinctValuesCount, + boolean enableValuesCount) { + this( + columnDisplayName, + null, + -1, + null, + primitiveType, + enableDistinctValuesCount, + enableValuesCount); } void reset() { @@ -73,6 +100,10 @@ void reset() { this.currentMinRealValue = null; this.currentNullCount = 0; this.currentMaxLength = 0; + if (distinctValues != null) { + distinctValues.clear(); + } + this.numberOfValues = 0L; } /** Create new statistics for the same column, with all calculated values set to empty */ @@ -82,7 +113,9 @@ RowBufferStats forkEmpty() { this.getCollationDefinitionString(), this.getOrdinal(), this.getFieldId(), - this.getPrimitiveType()); + this.getPrimitiveType(), + this.enableDistinctValuesCount, + this.enableValuesCount); } // TODO performance test this vs in place update @@ -95,13 +128,25 @@ static RowBufferStats getCombinedStats(RowBufferStats left, RowBufferStats right "left=%s, right=%s", left.getCollationDefinitionString(), right.getCollationDefinitionString())); } + + if (left.enableDistinctValuesCount != right.enableDistinctValuesCount) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Tried to combine stats for different distinct value settings", + String.format( + "left=%s, right=%s", + left.enableDistinctValuesCount, right.enableDistinctValuesCount)); + } + RowBufferStats combined = new RowBufferStats( left.columnDisplayName, left.getCollationDefinitionString(), left.getOrdinal(), left.getFieldId(), - left.getPrimitiveType()); + left.getPrimitiveType(), + left.enableDistinctValuesCount, + left.enableValuesCount); if (left.currentMinIntValue != null) { combined.addIntValue(left.currentMinIntValue); @@ -133,6 +178,15 @@ static RowBufferStats getCombinedStats(RowBufferStats left, RowBufferStats right combined.addRealValue(right.currentMaxRealValue); } + if (combined.enableDistinctValuesCount) { + combined.distinctValues.addAll(left.distinctValues); + combined.distinctValues.addAll(right.distinctValues); + } + + if (combined.enableValuesCount) { + combined.numberOfValues = left.numberOfValues + right.numberOfValues; + } + combined.currentNullCount = left.currentNullCount + right.currentNullCount; combined.currentMaxLength = Math.max(left.currentMaxLength, right.currentMaxLength); @@ -145,7 +199,6 @@ void addStrValue(String value) { void addBinaryValue(byte[] valueBytes) { this.setCurrentMaxLength(valueBytes.length); - // Check if new min/max string if (this.currentMinStrValue == null) { this.currentMinStrValue = valueBytes; @@ -159,6 +212,13 @@ void addBinaryValue(byte[] valueBytes) { this.currentMaxStrValue = valueBytes; } } + + if (enableDistinctValuesCount) { + distinctValues.add(Arrays.hashCode(valueBytes)); + } + if (enableValuesCount) { + numberOfValues++; + } } byte[] getCurrentMinStrValue() { @@ -179,6 +239,13 @@ void addIntValue(BigInteger value) { } else if (this.currentMaxIntValue.compareTo(value) < 0) { this.currentMaxIntValue = value; } + + if (enableDistinctValuesCount) { + distinctValues.add(value); + } + if (enableValuesCount) { + numberOfValues++; + } } BigInteger getCurrentMinIntValue() { @@ -199,6 +266,13 @@ void addRealValue(Double value) { } else if (this.currentMaxRealValue.compareTo(value) < 0) { this.currentMaxRealValue = value; } + + if (enableDistinctValuesCount) { + distinctValues.add(value); + } + if (enableValuesCount) { + numberOfValues++; + } } Double getCurrentMinRealValue() { @@ -233,7 +307,12 @@ long getCurrentMaxLength() { * @return -1 indicating the NDV is unknown */ long getDistinctValues() { - return EP_NDV_UNKNOWN; + return enableDistinctValuesCount ? distinctValues.size() : EP_NDV_UNKNOWN; + } + + // TODO: change default to -1 after Oct 17 + Long getNumberOfValues() { + return enableValuesCount ? numberOfValues : null; } String getCollationDefinitionString() { diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 3d04aa1b2..b86e59525 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -157,73 +157,85 @@ private void setParameterMap( BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( INSERT_THROTTLE_INTERVAL_IN_MILLIS, INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( INSERT_THROTTLE_THRESHOLD_IN_BYTES, INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( ENABLE_SNOWPIPE_STREAMING_METRICS, SNOWPIPE_STREAMING_METRICS_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( - BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT, parameterOverrides, props, false); + BLOB_FORMAT_VERSION, + BLOB_FORMAT_VERSION_DEFAULT, + parameterOverrides, + props, + false /* enforceDefault */); getBlobFormatVersion(); // to verify parsing the configured value this.checkAndUpdate( - IO_TIME_CPU_RATIO, IO_TIME_CPU_RATIO_DEFAULT, parameterOverrides, props, false); + IO_TIME_CPU_RATIO, + IO_TIME_CPU_RATIO_DEFAULT, + parameterOverrides, + props, + false /* enforceDefault */); this.checkAndUpdate( BLOB_UPLOAD_MAX_RETRY_COUNT, BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( MAX_MEMORY_LIMIT_IN_BYTES, MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( - MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props, false); + MAX_CHUNK_SIZE_IN_BYTES, + MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, + parameterOverrides, + props, + false /* enforceDefault */); this.checkAndUpdate( MAX_CLIENT_LAG, isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( MAX_CHUNKS_IN_BLOB, @@ -237,21 +249,21 @@ private void setParameterMap( MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); this.checkAndUpdate( ENABLE_NEW_JSON_PARSING_LOGIC, ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, parameterOverrides, props, - false); + false /* enforceDefault */); if (getMaxChunksInBlob() > getMaxChunksInRegistrationRequest()) { throw new IllegalArgumentException( diff --git a/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java b/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java new file mode 100644 index 000000000..2b71eea27 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.utils; + +import static net.snowflake.ingest.utils.Utils.concatDotPath; +import static net.snowflake.ingest.utils.Utils.isNullOrEmpty; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +/** Helper class to find all leaf sub-columns in an immutable schema given a dot path. */ +public class SubColumnFinder { + static class SubtreeInterval { + final int startTag; + final int endTag; + + SubtreeInterval(int startTag, int endTag) { + this.startTag = startTag; + this.endTag = endTag; + } + } + + private final List list; + private final Map accessMap; + + public SubColumnFinder(MessageType schema) { + accessMap = new HashMap<>(); + list = new ArrayList<>(); + build(schema, null); + } + + public List getSubColumns(String dotPath) { + if (!accessMap.containsKey(dotPath)) { + throw new IllegalArgumentException(String.format("Column %s not found in schema", dotPath)); + } + SubtreeInterval interval = accessMap.get(dotPath); + return Collections.unmodifiableList(list.subList(interval.startTag, interval.endTag)); + } + + private void build(Type node, String dotPath) { + if (dotPath == null) { + /* Ignore root node type name (bdec or schema) */ + dotPath = ""; + } else if (dotPath.isEmpty()) { + dotPath = node.getName(); + } else { + dotPath = concatDotPath(dotPath, node.getName()); + } + + int startTag = list.size(); + if (!node.isPrimitive()) { + for (Type child : node.asGroupType().getFields()) { + build(child, dotPath); + } + } else { + list.add(dotPath); + } + if (!isNullOrEmpty(dotPath)) { + accessMap.put(dotPath, new SubtreeInterval(startTag, list.size())); + } + } +} diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index 95d941036..becbfb46a 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -8,6 +8,7 @@ import com.codahale.metrics.Timer; import io.netty.util.internal.PlatformDependent; +import java.io.IOException; import java.io.StringReader; import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; @@ -29,6 +30,8 @@ import java.util.Properties; import net.snowflake.client.core.SFSessionProperty; import org.apache.commons.codec.binary.Base64; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.hadoop.ParquetFileWriter; import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; import org.bouncycastle.openssl.PEMParser; import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; @@ -412,7 +415,7 @@ public static String getFullyQualifiedChannelName( return String.format("%s.%s.%s.%s", dbName, schemaName, tableName, channelName); } - /* + /** * Get concat dot path, check if any path is empty or null * * @param path the path @@ -430,4 +433,24 @@ public static String concatDotPath(String... path) { } return sb.toString(); } + + /** + * Get the extended metadata size (footer size) from a parquet file + * + * @param bytes the serialized parquet file + * @param length the length of the byte array without padding + * @return the extended metadata size + */ + public static int getExtendedMetadataSize(byte[] bytes, int length) throws IOException { + final int magicOffset = length - ParquetFileWriter.MAGIC.length; + final int footerSizeOffset = magicOffset - Integer.BYTES; + if (bytes.length < length + || footerSizeOffset < 0 + || !ParquetFileWriter.MAGIC_STR.equals( + new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) { + throw new IllegalArgumentException("Invalid parquet file"); + } + + return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset); + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 04a740272..51aa1c289 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -121,8 +121,10 @@ private List> createChannelDataPerTable(int metada Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) .as(LogicalTypeAnnotation.stringType()) .id(1) - .named("test")) - : new RowBufferStats(columnName, null, 1, null, null)); + .named("test"), + isIceberg, + isIceberg) + : new RowBufferStats(columnName, null, 1, null, null, false, false)); channelData.setChannelContext( new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L)); return Collections.singletonList(channelData); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java index 5b545d697..52b719484 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.math.BigInteger; @@ -8,13 +12,22 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class ChannelDataTest { + @Parameterized.Parameters(name = "enableNDVAndNV: {0}") + public static Object[] enableNDVAndNV() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean enableNDVAndNV; @Test public void testGetCombinedColumnStatsMapNulls() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); @@ -43,12 +56,12 @@ public void testGetCombinedColumnStatsMapNulls() { @Test public void testGetCombinedColumnStatsMapMissingColumn() { Map left = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); left.put("one", leftStats1); leftStats1.addIntValue(new BigInteger("10")); Map right = new HashMap<>(); - RowBufferStats rightStats1 = new RowBufferStats("COL1"); + RowBufferStats rightStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); right.put("foo", rightStats1); rightStats1.addIntValue(new BigInteger("11")); @@ -78,10 +91,10 @@ public void testGetCombinedColumnStatsMap() { Map left = new HashMap<>(); Map right = new HashMap<>(); - RowBufferStats leftStats1 = new RowBufferStats("COL1"); - RowBufferStats rightStats1 = new RowBufferStats("COL1"); - RowBufferStats leftStats2 = new RowBufferStats("COL1"); - RowBufferStats rightStats2 = new RowBufferStats("COL1"); + RowBufferStats leftStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats rightStats1 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats leftStats2 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats rightStats2 = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); left.put("one", leftStats1); left.put("two", leftStats2); @@ -107,20 +120,34 @@ public void testGetCombinedColumnStatsMap() { Assert.assertEquals(new BigInteger("10"), oneCombined.getCurrentMinIntValue()); Assert.assertEquals(new BigInteger("17"), oneCombined.getCurrentMaxIntValue()); - Assert.assertEquals(-1, oneCombined.getDistinctValues()); Assert.assertNull(oneCombined.getCurrentMinStrValue()); Assert.assertNull(oneCombined.getCurrentMaxStrValue()); Assert.assertNull(oneCombined.getCurrentMinRealValue()); Assert.assertNull(oneCombined.getCurrentMaxRealValue()); + if (enableNDVAndNV) { + Assert.assertEquals(5, oneCombined.getDistinctValues()); + Assert.assertEquals(5, oneCombined.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, oneCombined.getDistinctValues()); + Assert.assertNull(oneCombined.getNumberOfValues()); + } + Assert.assertArrayEquals( "10".getBytes(StandardCharsets.UTF_8), twoCombined.getCurrentMinStrValue()); Assert.assertArrayEquals( "17".getBytes(StandardCharsets.UTF_8), twoCombined.getCurrentMaxStrValue()); - Assert.assertEquals(-1, twoCombined.getDistinctValues()); Assert.assertNull(twoCombined.getCurrentMinIntValue()); Assert.assertNull(twoCombined.getCurrentMaxIntValue()); Assert.assertNull(twoCombined.getCurrentMinRealValue()); Assert.assertNull(twoCombined.getCurrentMaxRealValue()); + + if (enableNDVAndNV) { + Assert.assertEquals(5, twoCombined.getDistinctValues()); + Assert.assertEquals(5, twoCombined.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, twoCombined.getDistinctValues()); + Assert.assertNull(twoCombined.getNumberOfValues()); + } } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java index f4ffa11c4..0818acfea 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java @@ -32,8 +32,10 @@ public void testFileColumnPropertiesConstructor() { Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) .as(LogicalTypeAnnotation.stringType()) .id(1) - .named("test")) - : new RowBufferStats("COL", null, 1, null, null); + .named("test"), + isIceberg, + isIceberg) + : new RowBufferStats("COL", null, 1, null, null, false, false); stats.addStrValue("bcd"); stats.addStrValue("abcde"); FileColumnProperties props = new FileColumnProperties(stats, !isIceberg); @@ -55,8 +57,10 @@ public void testFileColumnPropertiesConstructor() { Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) .as(LogicalTypeAnnotation.stringType()) .id(1) - .named("test")) - : new RowBufferStats("COL", null, 1, null, null); + .named("test"), + isIceberg, + isIceberg) + : new RowBufferStats("COL", null, 1, null, null, false, false); stats.addStrValue("aßßßßßßßßßßßßßßßß"); Assert.assertEquals(33, stats.getCurrentMinStrValue().length); props = new FileColumnProperties(stats, !isIceberg); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index a7e4ba35b..2a1a3d97b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -884,10 +884,16 @@ public void testBuildAndUpload() throws Exception { RowBufferStats stats1 = new RowBufferStats( - "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1")); + "COL1", + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), + isIcebergMode, + isIcebergMode); RowBufferStats stats2 = new RowBufferStats( - "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1")); + "COL1", + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), + isIcebergMode, + isIcebergMode); eps1.put("one", stats1); eps2.put("one", stats2); @@ -919,7 +925,7 @@ public void testBuildAndUpload() throws Exception { EpInfo expectedChunkEpInfo = AbstractRowBuffer.buildEpInfoFromStats( - 3, ChannelData.getCombinedColumnStatsMap(eps1, eps2), !isIcebergMode); + 3, ChannelData.getCombinedColumnStatsMap(eps1, eps2), !isIcebergMode, isIcebergMode); ChannelMetadata expectedChannel1Metadata = ChannelMetadata.builder() @@ -1049,8 +1055,11 @@ public void testInvalidateChannels() { Mockito.mock(SnowflakeStreamingIngestClientInternal.class); ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); ChannelCache channelCache = new ChannelCache<>(); + InternalParameterProvider internalParameterProvider = + new InternalParameterProvider(isIcebergMode); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); + Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); SnowflakeStreamingIngestChannelInternal channel1 = new SnowflakeStreamingIngestChannelInternal<>( "channel1", @@ -1134,13 +1143,16 @@ public void testBlobBuilder() throws Exception { RowBufferStats stats1 = new RowBufferStats( - "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1")); + "COL1", + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), + isIcebergMode, + isIcebergMode); eps1.put("one", stats1); stats1.addIntValue(new BigInteger("10")); stats1.addIntValue(new BigInteger("15")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1, !isIcebergMode); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1, !isIcebergMode, isIcebergMode); ChannelMetadata channelMetadata = ChannelMetadata.builder() diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index 007dc3e23..279e100fd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -21,32 +21,46 @@ import java.util.Map; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.SubColumnFinder; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; import org.apache.parquet.schema.Types; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class IcebergParquetValueParserTest { - static ObjectMapper objectMapper = new ObjectMapper(); + static ObjectMapper objectMapper; + static SubColumnFinder mockSubColumnFinder; + + @Before + public void setUp() { + objectMapper = new ObjectMapper(); + mockSubColumnFinder = Mockito.mock(SubColumnFinder.class); + Mockito.when(mockSubColumnFinder.getSubColumns(Mockito.anyString())) + .thenReturn(Collections.emptyList()); + } @Test public void parseValueBoolean() { Type type = Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).named("BOOLEAN_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL", true, true); Map rowBufferStatsMap = new HashMap() { { put("BOOLEAN_COL", rowBufferStats); } }; + ParquetBufferValue pv = - IcebergParquetValueParser.parseColumnValueToParquet(true, type, rowBufferStatsMap, UTC, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + true, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -61,7 +75,7 @@ public void parseValueBoolean() { public void parseValueInt() { Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).named("INT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("INT_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("INT_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -70,7 +84,7 @@ public void parseValueInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Integer.MAX_VALUE, type, rowBufferStatsMap, UTC, 0); + Integer.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -88,7 +102,7 @@ public void parseValueDecimalToInt() { .as(LogicalTypeAnnotation.decimalType(4, 9)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -97,7 +111,7 @@ public void parseValueDecimalToInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - new BigDecimal("12345.6789"), type, rowBufferStatsMap, UTC, 0); + new BigDecimal("12345.6789"), type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -115,7 +129,7 @@ public void parseValueDateToInt() { .as(LogicalTypeAnnotation.dateType()) .named("DATE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -124,7 +138,7 @@ public void parseValueDateToInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01", type, rowBufferStatsMap, UTC, 0); + "2024-01-01", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -139,7 +153,7 @@ public void parseValueDateToInt() { public void parseValueLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).named("LONG_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -148,7 +162,7 @@ public void parseValueLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Long.MAX_VALUE, type, rowBufferStatsMap, UTC, 0); + Long.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -166,7 +180,7 @@ public void parseValueDecimalToLong() { .as(LogicalTypeAnnotation.decimalType(9, 18)) .named("DECIMAL_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -175,7 +189,12 @@ public void parseValueDecimalToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - new BigDecimal("123456789.123456789"), type, rowBufferStatsMap, UTC, 0); + new BigDecimal("123456789.123456789"), + type, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -193,7 +212,7 @@ public void parseValueTimeToLong() { .as(LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIME_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -202,7 +221,7 @@ public void parseValueTimeToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "12:34:56.789", type, rowBufferStatsMap, UTC, 0); + "12:34:56.789", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -220,7 +239,7 @@ public void parseValueTimestampToLong() { .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -229,7 +248,7 @@ public void parseValueTimestampToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01T12:34:56.789+08:00", type, rowBufferStatsMap, UTC, 0); + "2024-01-01T12:34:56.789+08:00", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -247,7 +266,7 @@ public void parseValueTimestampTZToLong() { .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_TZ_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -256,7 +275,7 @@ public void parseValueTimestampTZToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01T12:34:56.789+08:00", type, rowBufferStatsMap, UTC, 0); + "2024-01-01T12:34:56.789+08:00", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -271,7 +290,7 @@ public void parseValueTimestampTZToLong() { public void parseValueFloat() { Type type = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).named("FLOAT_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -280,7 +299,7 @@ public void parseValueFloat() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Float.MAX_VALUE, type, rowBufferStatsMap, UTC, 0); + Float.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -295,7 +314,7 @@ public void parseValueFloat() { public void parseValueDouble() { Type type = Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).named("DOUBLE_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -304,7 +323,7 @@ public void parseValueDouble() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Double.MAX_VALUE, type, rowBufferStatsMap, UTC, 0); + Double.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -319,7 +338,7 @@ public void parseValueDouble() { public void parseValueBinary() { Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -328,7 +347,8 @@ public void parseValueBinary() { }; byte[] value = "snowflake_to_the_moon".getBytes(); ParquetBufferValue pv = - IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStatsMap, UTC, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -347,7 +367,7 @@ public void parseValueStringToBinary() { .as(LogicalTypeAnnotation.stringType()) .named("BINARY_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -356,7 +376,8 @@ public void parseValueStringToBinary() { }; String value = "snowflake_to_the_moon"; ParquetBufferValue pv = - IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStatsMap, UTC, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -377,7 +398,7 @@ public void parseValueFixed() { .length(4) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -386,7 +407,8 @@ public void parseValueFixed() { }; byte[] value = "snow".getBytes(); ParquetBufferValue pv = - IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStatsMap, UTC, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -406,7 +428,7 @@ public void parseValueDecimalToFixed() { .as(LogicalTypeAnnotation.decimalType(10, 20)) .named("FIXED_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -415,7 +437,8 @@ public void parseValueDecimalToFixed() { }; BigDecimal value = new BigDecimal("1234567890.0123456789"); ParquetBufferValue pv = - IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStatsMap, UTC, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -433,7 +456,7 @@ public void parseList() throws JsonProcessingException { Types.optionalList() .element(Types.optional(PrimitiveTypeName.INT32).named("element")) .named("LIST_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element"); + RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -441,10 +464,11 @@ public void parseList() throws JsonProcessingException { } }; - IcebergParquetValueParser.parseColumnValueToParquet(null, list, rowBufferStatsMap, UTC, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, list, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Arrays.asList(1, 2, 3, 4, 5), list, rowBufferStatsMap, UTC, 0); + Arrays.asList(1, 2, 3, 4, 5), list, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferStats) .parquetBufferValue(pv) @@ -467,10 +491,10 @@ public void parseList() throws JsonProcessingException { SFException.class, () -> IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredList, rowBufferStatsMap, UTC, 0)); + null, requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); pv = IcebergParquetValueParser.parseColumnValueToParquet( - new ArrayList<>(), requiredList, rowBufferStatsMap, UTC, 0); + new ArrayList<>(), requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferStats) .parquetBufferValue(pv) @@ -490,7 +514,12 @@ public void parseList() throws JsonProcessingException { SFException.class, () -> IcebergParquetValueParser.parseColumnValueToParquet( - Collections.singletonList(null), requiredElements, rowBufferStatsMap, UTC, 0)); + Collections.singletonList(null), + requiredElements, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0)); } @Test @@ -500,8 +529,8 @@ public void parseMap() throws JsonProcessingException { .key(Types.required(PrimitiveTypeName.INT32).named("key")) .value(Types.optional(PrimitiveTypeName.INT32).named("value")) .named("MAP_COL"); - RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key"); - RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value"); + RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key", true, true); + RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -509,7 +538,8 @@ public void parseMap() throws JsonProcessingException { put("MAP_COL.key_value.value", rowBufferValueStats); } }; - IcebergParquetValueParser.parseColumnValueToParquet(null, map, rowBufferStatsMap, UTC, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, map, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( new java.util.HashMap() { @@ -520,6 +550,7 @@ public void parseMap() throws JsonProcessingException { }, map, rowBufferStatsMap, + mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() @@ -546,10 +577,15 @@ public void parseMap() throws JsonProcessingException { SFException.class, () -> IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredMap, rowBufferStatsMap, UTC, 0)); + null, requiredMap, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); pv = IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap(), requiredMap, rowBufferStatsMap, UTC, 0); + new java.util.HashMap(), + requiredMap, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferKeyStats) .parquetBufferValue(pv) @@ -577,6 +613,7 @@ public void parseMap() throws JsonProcessingException { }, requiredValues, rowBufferStatsMap, + mockSubColumnFinder, UTC, 0)); } @@ -592,8 +629,8 @@ public void parseStruct() throws JsonProcessingException { .named("b")) .named("STRUCT_COL"); - RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a"); - RowBufferStats rowBufferBStats = new RowBufferStats("STRUCT_COL.b"); + RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a", true, true); + RowBufferStats rowBufferBStats = new RowBufferStats("STRUCT_COL.b", true, true); Map rowBufferStatsMap = new HashMap() { { @@ -602,7 +639,8 @@ public void parseStruct() throws JsonProcessingException { } }; - IcebergParquetValueParser.parseColumnValueToParquet(null, struct, rowBufferStatsMap, UTC, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, struct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); Assert.assertThrows( SFException.class, () -> @@ -614,6 +652,7 @@ public void parseStruct() throws JsonProcessingException { }, struct, rowBufferStatsMap, + mockSubColumnFinder, UTC, 0)); Assert.assertThrows( @@ -627,6 +666,7 @@ public void parseStruct() throws JsonProcessingException { }, struct, rowBufferStatsMap, + mockSubColumnFinder, UTC, 0)); ParquetBufferValue pv = @@ -640,6 +680,7 @@ public void parseStruct() throws JsonProcessingException { }), struct, rowBufferStatsMap, + mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() @@ -664,10 +705,15 @@ public void parseStruct() throws JsonProcessingException { SFException.class, () -> IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredStruct, rowBufferStatsMap, UTC, 0)); + null, requiredStruct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); pv = IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap(), requiredStruct, rowBufferStatsMap, UTC, 0); + new java.util.HashMap(), + requiredStruct, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .expectedValueClass(ArrayList.class) @@ -688,7 +734,7 @@ public void parseNestedTypes() { List reference = (List) res.getSecond(); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .expectedValueClass(ArrayList.class) @@ -703,7 +749,7 @@ public void parseNestedTypes() { private static Type generateNestedTypeAndStats( int depth, String name, Map rowBufferStatsMap, String path) { if (depth == 0) { - rowBufferStatsMap.put(path, new RowBufferStats(path)); + rowBufferStatsMap.put(path, new RowBufferStats(path, true, true)); return Types.optional(PrimitiveTypeName.INT32).named(name); } switch (depth % 3) { @@ -718,7 +764,8 @@ private static Type generateNestedTypeAndStats( .addField(generateNestedTypeAndStats(depth - 1, "a", rowBufferStatsMap, path + ".a")) .named(name); case 0: - rowBufferStatsMap.put(path + ".key_value.key", new RowBufferStats(path + ".key_value.key")); + rowBufferStatsMap.put( + path + ".key_value.key", new RowBufferStats(path + ".key_value.key", true, true)); return Types.optionalMap() .key(Types.required(PrimitiveTypeName.INT32).named("key")) .value( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java index 9f2e848f3..a9583a3f6 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java @@ -1,15 +1,28 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RowBufferStatsTest { + @Parameterized.Parameters(name = "enableNDVAndNV: {0}") + public static Object[] enableNDVAndNV() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean enableNDVAndNV; @Test public void testEmptyState() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); Assert.assertNull(stats.getCollationDefinitionString()); Assert.assertNull(stats.getCurrentMinRealValue()); @@ -18,32 +31,57 @@ public void testEmptyState() throws Exception { Assert.assertNull(stats.getCurrentMaxStrValue()); Assert.assertNull(stats.getCurrentMinIntValue()); Assert.assertNull(stats.getCurrentMaxIntValue()); - Assert.assertEquals(0, stats.getCurrentNullCount()); - Assert.assertEquals(-1, stats.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(0, stats.getDistinctValues()); + Assert.assertEquals(0, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } } @Test public void testMinMaxStrNonCol() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); stats.addStrValue("bob"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + if (enableNDVAndNV) { + Assert.assertEquals(1, stats.getDistinctValues()); + Assert.assertEquals(1, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } stats.addStrValue("charlie"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + if (enableNDVAndNV) { + Assert.assertEquals(2, stats.getDistinctValues()); + Assert.assertEquals(2, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } stats.addStrValue("alice"); Assert.assertArrayEquals( "alice".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(3, stats.getDistinctValues()); + Assert.assertEquals(3, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } Assert.assertNull(stats.getCurrentMinRealValue()); Assert.assertNull(stats.getCurrentMaxRealValue()); @@ -55,22 +93,43 @@ public void testMinMaxStrNonCol() throws Exception { @Test public void testMinMaxInt() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); stats.addIntValue(BigInteger.valueOf(5)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(1, stats.getDistinctValues()); + Assert.assertEquals(1, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } stats.addIntValue(BigInteger.valueOf(6)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(2, stats.getDistinctValues()); + Assert.assertEquals(2, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } stats.addIntValue(BigInteger.valueOf(4)); Assert.assertEquals(BigInteger.valueOf((4)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(3, stats.getDistinctValues()); + Assert.assertEquals(3, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } Assert.assertNull(stats.getCurrentMinRealValue()); Assert.assertNull(stats.getCurrentMaxRealValue()); @@ -82,22 +141,43 @@ public void testMinMaxInt() throws Exception { @Test public void testMinMaxReal() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); stats.addRealValue(1.0); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(1, stats.getDistinctValues()); + Assert.assertEquals(1, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } stats.addRealValue(1.5); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(2, stats.getDistinctValues()); + Assert.assertEquals(2, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } stats.addRealValue(.8); Assert.assertEquals(Double.valueOf(.8), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - Assert.assertEquals(-1, stats.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(3, stats.getDistinctValues()); + Assert.assertEquals(3, stats.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, stats.getDistinctValues()); + Assert.assertNull(stats.getNumberOfValues()); + } Assert.assertNull(stats.getCurrentMinIntValue()); Assert.assertNull(stats.getCurrentMaxIntValue()); @@ -109,7 +189,7 @@ public void testMinMaxReal() throws Exception { @Test public void testIncCurrentNullCount() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); Assert.assertEquals(0, stats.getCurrentNullCount()); stats.incCurrentNullCount(); @@ -120,7 +200,7 @@ public void testIncCurrentNullCount() throws Exception { @Test public void testMaxLength() throws Exception { - RowBufferStats stats = new RowBufferStats("COL1"); + RowBufferStats stats = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); Assert.assertEquals(0, stats.getCurrentMaxLength()); stats.setCurrentMaxLength(100L); @@ -132,8 +212,8 @@ public void testMaxLength() throws Exception { @Test public void testGetCombinedStats() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1"); - RowBufferStats two = new RowBufferStats("COL1"); + RowBufferStats one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -150,17 +230,24 @@ public void testGetCombinedStats() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(1), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(-1, result.getDistinctValues()); - Assert.assertEquals(2, result.getCurrentNullCount()); + if (enableNDVAndNV) { + Assert.assertEquals(7, result.getDistinctValues()); + Assert.assertEquals(8, result.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertNull(result.getNumberOfValues()); + } + + Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); Assert.assertNull(result.getCurrentMaxStrValue()); Assert.assertNull(result.getCurrentMinRealValue()); Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); one.addRealValue(2d); one.addRealValue(4d); @@ -175,9 +262,16 @@ public void testGetCombinedStats() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(1), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(-1, result.getDistinctValues()); - Assert.assertEquals(0, result.getCurrentNullCount()); + if (enableNDVAndNV) { + Assert.assertEquals(7, result.getDistinctValues()); + Assert.assertEquals(8, result.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertNull(result.getNumberOfValues()); + } + + Assert.assertEquals(0, result.getCurrentNullCount()); Assert.assertNull(result.getCollationDefinitionString()); Assert.assertNull(result.getCurrentMinStrValue()); Assert.assertNull(result.getCurrentMaxStrValue()); @@ -185,8 +279,8 @@ public void testGetCombinedStats() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings without collation - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); one.addStrValue("alpha"); one.addStrValue("d"); @@ -205,10 +299,17 @@ public void testGetCombinedStats() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertArrayEquals("a".getBytes(StandardCharsets.UTF_8), result.getCurrentMinStrValue()); Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); - Assert.assertEquals(-1, result.getDistinctValues()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertEquals(5, result.getCurrentMaxLength()); + if (enableNDVAndNV) { + Assert.assertEquals(7, result.getDistinctValues()); + Assert.assertEquals(8, result.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertNull(result.getNumberOfValues()); + } + Assert.assertNull(result.getCurrentMinRealValue()); Assert.assertNull(result.getCurrentMaxRealValue()); Assert.assertNull(result.getCurrentMinIntValue()); @@ -218,8 +319,8 @@ public void testGetCombinedStats() throws Exception { @Test public void testGetCombinedStatsNull() throws Exception { // Test for Integers - RowBufferStats one = new RowBufferStats("COL1"); - RowBufferStats two = new RowBufferStats("COL1"); + RowBufferStats one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + RowBufferStats two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); one.addIntValue(BigInteger.valueOf(2)); one.addIntValue(BigInteger.valueOf(4)); @@ -231,7 +332,15 @@ public void testGetCombinedStatsNull() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(2), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(4, result.getDistinctValues()); + Assert.assertEquals(4, result.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertNull(result.getNumberOfValues()); + } + Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -240,7 +349,7 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxRealValue()); // Test for Reals - one = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); one.addRealValue(2d); one.addRealValue(4d); @@ -250,7 +359,13 @@ public void testGetCombinedStatsNull() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(2), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + if (enableNDVAndNV) { + Assert.assertEquals(4, result.getDistinctValues()); + Assert.assertEquals(4, result.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertNull(result.getNumberOfValues()); + } Assert.assertEquals(0, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -259,8 +374,8 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertNull(result.getCurrentMaxIntValue()); // Test for Strings - one = new RowBufferStats("COL1"); - two = new RowBufferStats("COL1"); + one = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); + two = new RowBufferStats("COL1", enableNDVAndNV, enableNDVAndNV); one.addStrValue("alpha"); one.addStrValue("d"); @@ -272,7 +387,14 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertArrayEquals( "alpha".getBytes(StandardCharsets.UTF_8), result.getCurrentMinStrValue()); Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); - Assert.assertEquals(-1, result.getDistinctValues()); + + if (enableNDVAndNV) { + Assert.assertEquals(4, result.getDistinctValues()); + Assert.assertEquals(4, result.getNumberOfValues().longValue()); + } else { + Assert.assertEquals(-1, result.getDistinctValues()); + Assert.assertNull(result.getNumberOfValues()); + } Assert.assertEquals(1, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinRealValue()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index e1cb764dd..a8fc965b7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -42,7 +46,7 @@ public class RowBufferTest { @Parameterized.Parameters(name = "isIcebergMode: {0}") public static Object[] isIcebergMode() { - return new Object[] {false, true}; + return new Object[] {true}; } @Parameterized.Parameter public static boolean isIcebergMode; @@ -154,6 +158,8 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o Constants.BdecParquetCompression.GZIP, ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, isIcebergMode ? Optional.of(1) : Optional.empty(), + isIcebergMode, + isIcebergMode, isIcebergMode), null, isIcebergMode @@ -572,7 +578,9 @@ public void testBuildEpInfoFromStats() { RowBufferStats stats1 = new RowBufferStats( "intColumn", - Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn")); + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn"), + isIcebergMode, + isIcebergMode); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); stats1.addIntValue(BigInteger.valueOf(1)); @@ -580,7 +588,9 @@ public void testBuildEpInfoFromStats() { RowBufferStats stats2 = new RowBufferStats( "strColumn", - Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn")); + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn"), + isIcebergMode, + isIcebergMode); stats2.addStrValue("alice"); stats2.addStrValue("bob"); stats2.incCurrentNullCount(); @@ -588,12 +598,13 @@ public void testBuildEpInfoFromStats() { colStats.put("intColumn", stats1); colStats.put("strColumn", stats2); - EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode); + EpInfo result = + AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode, isIcebergMode); Map columnResults = result.getColumnEps(); Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties strColumnResult = columnResults.get("strColumn"); - Assert.assertEquals(-1, strColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 2 : -1, strColumnResult.getDistinctValues()); Assert.assertEquals( Hex.encodeHexString("alice".getBytes(StandardCharsets.UTF_8)), strColumnResult.getMinStrValue()); @@ -603,7 +614,7 @@ public void testBuildEpInfoFromStats() { Assert.assertEquals(1, strColumnResult.getNullCount()); FileColumnProperties intColumnResult = columnResults.get("intColumn"); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 3 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals(BigInteger.valueOf(1), intColumnResult.getMinIntValue()); Assert.assertEquals(BigInteger.valueOf(10), intColumnResult.getMaxIntValue()); Assert.assertEquals(0, intColumnResult.getNullCount()); @@ -618,23 +629,28 @@ public void testBuildEpInfoFromNullColumnStats() { RowBufferStats stats1 = new RowBufferStats( intColName, - Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named(intColName)); + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named(intColName), + isIcebergMode, + isIcebergMode); RowBufferStats stats2 = new RowBufferStats( realColName, - Types.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).id(2).named(realColName)); + Types.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).id(2).named(realColName), + isIcebergMode, + isIcebergMode); stats1.incCurrentNullCount(); stats2.incCurrentNullCount(); colStats.put(intColName, stats1); colStats.put(realColName, stats2); - EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode); + EpInfo result = + AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode, isIcebergMode); Map columnResults = result.getColumnEps(); Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties intColumnResult = columnResults.get(intColName); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 0 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals( FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue()); Assert.assertEquals( @@ -643,7 +659,7 @@ public void testBuildEpInfoFromNullColumnStats() { Assert.assertEquals(0, intColumnResult.getMaxLength()); FileColumnProperties realColumnResult = columnResults.get(realColName); - Assert.assertEquals(-1, intColumnResult.getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 0 : -1, intColumnResult.getDistinctValues()); Assert.assertEquals( FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue()); Assert.assertEquals( @@ -659,7 +675,9 @@ public void testInvalidEPInfo() { RowBufferStats stats1 = new RowBufferStats( "intColumn", - Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn")); + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn"), + isIcebergMode, + isIcebergMode); stats1.addIntValue(BigInteger.valueOf(2)); stats1.addIntValue(BigInteger.valueOf(10)); stats1.addIntValue(BigInteger.valueOf(1)); @@ -667,7 +685,9 @@ public void testInvalidEPInfo() { RowBufferStats stats2 = new RowBufferStats( "strColumn", - Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn")); + Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn"), + isIcebergMode, + isIcebergMode); stats2.addStrValue("alice"); stats2.incCurrentNullCount(); stats2.incCurrentNullCount(); @@ -676,7 +696,7 @@ public void testInvalidEPInfo() { colStats.put("strColumn", stats2); try { - AbstractRowBuffer.buildEpInfoFromStats(1, colStats, !isIcebergMode); + AbstractRowBuffer.buildEpInfoFromStats(1, colStats, !isIcebergMode, isIcebergMode); fail("should fail when row count is smaller than null count."); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -789,33 +809,36 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Assert.assertEquals( BigInteger.valueOf(10), columnEpStats.get("colTinyInt").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("colTinyInt").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("colTinyInt").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 2 : -1, columnEpStats.get("colTinyInt").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(1), columnEpStats.get("COLTINYINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(1), columnEpStats.get("COLTINYINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLTINYINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLTINYINT").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 1 : -1, columnEpStats.get("COLTINYINT").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(3), columnEpStats.get("COLSMALLINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(2), columnEpStats.get("COLSMALLINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLSMALLINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLSMALLINT").getDistinctValues()); + Assert.assertEquals( + isIcebergMode ? 2 : -1, columnEpStats.get("COLSMALLINT").getDistinctValues()); Assert.assertEquals(BigInteger.valueOf(3), columnEpStats.get("COLINT").getCurrentMaxIntValue()); Assert.assertEquals(BigInteger.valueOf(3), columnEpStats.get("COLINT").getCurrentMinIntValue()); Assert.assertEquals(1L, columnEpStats.get("COLINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLINT").getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 1 : -1, columnEpStats.get("COLINT").getDistinctValues()); Assert.assertEquals( BigInteger.valueOf(40), columnEpStats.get("COLBIGINT").getCurrentMaxIntValue()); Assert.assertEquals( BigInteger.valueOf(4), columnEpStats.get("COLBIGINT").getCurrentMinIntValue()); Assert.assertEquals(0, columnEpStats.get("COLBIGINT").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLBIGINT").getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 2 : -1, columnEpStats.get("COLBIGINT").getDistinctValues()); Assert.assertArrayEquals( "2".getBytes(StandardCharsets.UTF_8), columnEpStats.get("COLCHAR").getCurrentMinStrValue()); @@ -823,7 +846,7 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { "alice".getBytes(StandardCharsets.UTF_8), columnEpStats.get("COLCHAR").getCurrentMaxStrValue()); Assert.assertEquals(0, columnEpStats.get("COLCHAR").getCurrentNullCount()); - Assert.assertEquals(-1, columnEpStats.get("COLCHAR").getDistinctValues()); + Assert.assertEquals(isIcebergMode ? 2 : -1, columnEpStats.get("COLCHAR").getDistinctValues()); // Confirm we reset ChannelData resetResults = rowBuffer.flush(); @@ -1102,7 +1125,9 @@ private void testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption o colBinary.setLogicalType("BINARY"); colBinary.setLength(8 * 1024 * 1024); colBinary.setByteLength(8 * 1024 * 1024); - colBinary.setSourceIcebergDataType("\"binary\""); + if (isIcebergMode) { + colBinary.setSourceIcebergDataType("\"binary\""); + } byte[] arr = new byte[8 * 1024 * 1024]; innerBuffer.setupSchema(Collections.singletonList(colBinary)); @@ -1335,6 +1360,9 @@ private void testE2EBooleanHelper(OpenChannelRequest.OnErrorOption onErrorOption colBoolean.setNullable(true); colBoolean.setLogicalType("BOOLEAN"); colBoolean.setScale(0); + if (isIcebergMode) { + colBoolean.setSourceIcebergDataType("\"boolean\""); + } innerBuffer.setupSchema(Collections.singletonList(colBoolean)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java index 64db57c31..a90e26e9d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -29,7 +33,7 @@ public void parseValueFixedSB1ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 12, @@ -61,7 +65,7 @@ public void parseValueFixedSB2ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 1234, @@ -93,7 +97,7 @@ public void parseValueFixedSB4ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789, @@ -125,7 +129,7 @@ public void parseValueFixedSB8ToInt64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789987654321L, @@ -157,7 +161,7 @@ public void parseValueFixedSB16ToByteArray() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("91234567899876543219876543211234567891"), @@ -191,7 +195,7 @@ public void parseValueFixedDecimalToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("12345.54321"), @@ -221,7 +225,7 @@ public void parseValueDouble() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( 12345.54321d, @@ -251,7 +255,7 @@ public void parseValueBoolean() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( true, @@ -281,7 +285,7 @@ public void parseValueBinary() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "1234abcd".getBytes(), @@ -326,7 +330,7 @@ private void testJsonWithLogicalType(String logicalType, boolean enableNewJsonPa String var = "{\"key1\":-879869596,\"key2\":\"value2\",\"key3\":null," + "\"key4\":{\"key41\":0.032437,\"key42\":\"value42\",\"key43\":null}}"; - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( var, @@ -376,7 +380,7 @@ private void testNullJsonWithLogicalType(String var, boolean enableNewJsonParsin .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( var, @@ -417,7 +421,7 @@ public void parseValueArrayToBinaryInternal(boolean enableNewJsonParsingLogic) { input.put("b", "2"); input.put("c", "3"); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( input, @@ -455,7 +459,7 @@ public void parseValueTextToBinary() { String text = "This is a sample text! Length is bigger than 32 bytes :)"; - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( text, @@ -492,7 +496,7 @@ public void parseValueTimestampNtzSB4Error() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); SFException exception = Assert.assertThrows( SFException.class, @@ -520,7 +524,7 @@ public void parseValueTimestampNtzSB8ToINT64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2013-04-28T20:57:01.000", @@ -551,7 +555,7 @@ public void parseValueTimestampNtzSB16ToByteArray() { .scale(9) // nanos .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2022-09-18T22:05:07.123456789", @@ -583,7 +587,7 @@ public void parseValueDateToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "2021-01-01", @@ -614,7 +618,7 @@ public void parseValueTimeSB4ToInt32() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00", @@ -645,7 +649,7 @@ public void parseValueTimeSB8ToInt64() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); ParquetBufferValue pv = SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00.123", @@ -676,7 +680,7 @@ public void parseValueTimeSB16Error() { .nullable(true) .build(); - RowBufferStats rowBufferStats = new RowBufferStats("COL1"); + RowBufferStats rowBufferStats = new RowBufferStats("COL1", false, false); SFException exception = Assert.assertThrows( SFException.class, diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 627593e82..e3f563fb6 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -506,8 +506,12 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { columnEps.put( "column", new RowBufferStats( - "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"))); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode); + "COL1", + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), + isIcebergMode, + isIcebergMode)); + EpInfo epInfo = + AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode, isIcebergMode); ChunkMetadata chunkMetadata = ChunkMetadata.builder() @@ -558,8 +562,12 @@ private Pair, Set> getRetryBlobMetadata( columnEps.put( "column", new RowBufferStats( - "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"))); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode); + "COL1", + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"), + isIcebergMode, + isIcebergMode)); + EpInfo epInfo = + AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode, isIcebergMode); ChannelMetadata channelMetadata1 = ChannelMetadata.builder() diff --git a/src/test/java/net/snowflake/ingest/utils/SubColumnFinderTest.java b/src/test/java/net/snowflake/ingest/utils/SubColumnFinderTest.java new file mode 100644 index 000000000..b5c538d00 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/utils/SubColumnFinderTest.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.utils; + +import static net.snowflake.ingest.utils.Utils.concatDotPath; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Test; + +public class SubColumnFinderTest { + + @Test + public void testFlatSchema() { + MessageType schema = + MessageTypeParser.parseMessageType( + "message schema {\n" + + " optional boolean BOOLEAN_COL = 1;\n" + + " optional int32 INT_COL = 2;\n" + + " optional int64 LONG_COL = 3;\n" + + " optional float FLOAT_COL = 4;\n" + + " optional double DOUBLE_COL = 5;\n" + + " optional int64 DECIMAL_COL (DECIMAL(10,5)) = 6;\n" + + " optional binary STRING_COL (STRING) = 7;\n" + + " optional fixed_len_byte_array(10) FIXED_COL = 8;\n" + + " optional binary BINARY_COL = 9;\n" + + " optional int32 DATE_COL (DATE) = 10;\n" + + " optional int64 TIME_COL (TIME(MICROS,false)) = 11;\n" + + " optional int64 TIMESTAMP_NTZ_COL (TIMESTAMP(MICROS,false)) = 12;\n" + + " optional int64 TIMESTAMP_LTZ_COL (TIMESTAMP(MICROS,true)) = 13;\n" + + "}\n"); + assertFindSubColumns(schema); + } + + @Test + public void testNestedSchema() { + MessageType schema = + MessageTypeParser.parseMessageType( + "message schema {\n" + + " optional group LIST_COL (LIST) = 1 {\n" + + " repeated group list {\n" + + " optional group element = 4 {\n" + + " optional group map_col (MAP) = 5 {\n" + + " repeated group key_value {\n" + + " required binary key (STRING) = 6;\n" + + " optional group value (LIST) = 7 {\n" + + " repeated group list {\n" + + " optional group element = 8 {\n" + + " optional int32 int_col = 9;\n" + + " optional boolean boolean_col = 10;\n" + + " optional group map_col (MAP) = 11 {\n" + + " repeated group key_value {\n" + + " required int32 key = 12;\n" + + " optional int32 value = 13;\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " optional group obj_col = 14 {\n" + + " optional group list_col (LIST) = 15 {\n" + + " repeated group list {\n" + + " optional int32 element = 16;\n" + + " }\n" + + " }\n" + + " optional group map_col (MAP) = 17 {\n" + + " repeated group key_value {\n" + + " required binary key (STRING) = 18;\n" + + " optional binary value (STRING) = 19;\n" + + " }\n" + + " }\n" + + " }\n" + + " optional int32 int_col = 20;\n" + + " optional float float_col = 21;\n" + + " }\n" + + " }\n" + + " }\n" + + " optional group OBJ_COL = 2 {\n" + + " optional group obj_col = 22 {\n" + + " optional group map_col (MAP) = 23 {\n" + + " repeated group key_value {\n" + + " required int32 key = 24;\n" + + " optional int32 value = 25;\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " optional double DOUBLE_COL = 3;\n" + + "}"); + assertFindSubColumns(schema); + } + + private void assertFindSubColumns(MessageType schema) { + SubColumnFinder subColumnFinder = new SubColumnFinder(schema); + for (String dotPath : getAllPossibleDotPath(schema)) { + assertThat(subColumnFinder.getSubColumns(dotPath)) + .usingRecursiveComparison() + .ignoringCollectionOrder() + .isEqualTo(findSubColumn(schema, dotPath)); + } + } + + private Iterable getAllPossibleDotPath(MessageType schema) { + Set dotPaths = new HashSet<>(); + for (ColumnDescriptor column : schema.getColumns()) { + String[] path = column.getPath(); + if (path.length == 0) { + continue; + } + String dotPath = path[0]; + dotPaths.add(dotPath); + for (int i = 1; i < path.length; i++) { + dotPath = concatDotPath(dotPath, path[i]); + dotPaths.add(dotPath); + } + } + return dotPaths; + } + + private List findSubColumn(MessageType schema, String dotPath) { + return schema.getColumns().stream() + .map(ColumnDescriptor::getPath) + .map(Utils::concatDotPath) + .filter( + s -> + s.startsWith(dotPath) + && (s.length() == dotPath.length() || s.charAt(dotPath.length()) == '.')) + .collect(Collectors.toList()); + } +}