diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 62c5ca6c7..747ec22dc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -658,7 +658,10 @@ static EpInfo buildEpInfoFromStats(long rowCount, Map co return epInfo; } - static EpInfo buildEPInfoFromBlocksMetadata(List blocksMetadata) { + static EpInfo buildEPInfoFromParquetWriterResults( + List blocksMetadata, + Map ndvStats, + Map maxLengthStats) { if (blocksMetadata.isEmpty()) { throw new SFException(ErrorCode.INTERNAL_ERROR, "No blocks metadata found"); } @@ -690,7 +693,9 @@ static EpInfo buildEPInfoFromBlocksMetadata(List blocksMetadata) new FileColumnProperties( columnOrdinal, columnChunkMetaData.getPrimitiveType().getId().intValue(), - mergedStatistics.get(subColumnName)); + mergedStatistics.get(subColumnName), + ndvStats.getOrDefault(subColumnName, 0), + maxLengthStats.getOrDefault(subColumnName, 0)); epInfo.getColumnEps().put(subColumnName, dto); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 107deecca..b6be2b58b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -135,8 +135,10 @@ static Blob constructBlobAndMetadata( .setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId()) .setEpInfo( isIceberg - ? AbstractRowBuffer.buildEPInfoFromBlocksMetadata( - serializedChunk.blocksMetadata) + ? AbstractRowBuffer.buildEPInfoFromParquetWriterResults( + serializedChunk.blocksMetadata, + serializedChunk.ndvStats, + serializedChunk.maxLengthStats) : AbstractRowBuffer.buildEpInfoFromStats( serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined)) .setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst()) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java index 7b42dbc5e..da4453b7d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -200,4 +200,14 @@ Long getFirstInsertTimeInMs() { Long getLastInsertTimeInMs() { return this.lastInsertTimeInMs; } + + // @JsonProperty("major_vers") + // Integer getMajorVersion() { + // return PARQUET_MAJOR_VERSION; + // } + // + // @JsonProperty("minor_vers") + // Integer getMinorVersion() { + // return PARQUET_MINOR_VERSION; + // } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java index e27eb500b..23e3dc755 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java @@ -5,7 +5,6 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex; -import static net.snowflake.ingest.utils.Constants.EP_NDV_UNKNOWN; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -99,11 +98,12 @@ class FileColumnProperties { this.setDistinctValues(stats.getDistinctValues()); } - FileColumnProperties(int columnOrdinal, int fieldId, Statistics statistics) { + FileColumnProperties( + int columnOrdinal, int fieldId, Statistics statistics, int ndv, int maxLength) { this.setColumnOrdinal(columnOrdinal); this.setFieldId(fieldId); this.setNullCount(statistics.getNumNulls()); - this.setDistinctValues(EP_NDV_UNKNOWN); + this.setDistinctValues(ndv); this.setCollation(null); this.setMaxStrNonCollated(null); this.setMinStrNonCollated(null); @@ -127,6 +127,7 @@ class FileColumnProperties { } else { this.setMinStrValue(truncateBytesAsHex(statistics.getMinBytes(), false)); this.setMaxStrValue(truncateBytesAsHex(statistics.getMaxBytes(), true)); + this.setMaxLength(maxLength); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java index 22726c397..3aa07b125 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java @@ -38,6 +38,8 @@ class SerializationResult { final ByteArrayOutputStream chunkData; final Pair chunkMinMaxInsertTimeInMs; final List blocksMetadata; + final Map ndvStats; + final Map maxLengthStats; public SerializationResult( List channelsMetadataList, @@ -46,7 +48,9 @@ public SerializationResult( float chunkEstimatedUncompressedSize, ByteArrayOutputStream chunkData, Pair chunkMinMaxInsertTimeInMs, - List blocksMetadata) { + List blocksMetadata, + Map ndvStats, + Map maxLengthStats) { this.channelsMetadataList = channelsMetadataList; this.columnEpStatsMapCombined = columnEpStatsMapCombined; this.rowCount = rowCount; @@ -54,6 +58,8 @@ public SerializationResult( this.chunkData = chunkData; this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs; this.blocksMetadata = blocksMetadata; + this.ndvStats = ndvStats; + this.maxLengthStats = maxLengthStats; } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index d86eb738f..b15a7fea5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -26,6 +26,7 @@ public class ParquetFlusher implements Flusher { private static final Logging logger = new Logging(ParquetFlusher.class); private final MessageType schema; private final long maxChunkSizeInBytes; + private final boolean enableNdvAndMaxLengthStats; private final Constants.BdecParquetCompression bdecParquetCompression; @@ -33,10 +34,12 @@ public class ParquetFlusher implements Flusher { public ParquetFlusher( MessageType schema, long maxChunkSizeInBytes, - Constants.BdecParquetCompression bdecParquetCompression) { + Constants.BdecParquetCompression bdecParquetCompression, + boolean enableNdvAndMaxLengthStats) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; + this.enableNdvAndMaxLengthStats = enableNdvAndMaxLengthStats; } @Override @@ -124,7 +127,8 @@ private SerializationResult serializeFromJavaObjects( metadata, firstChannelFullyQualifiedTableName, maxChunkSizeInBytes, - bdecParquetCompression); + bdecParquetCompression, + this.enableNdvAndMaxLengthStats); rows.forEach(parquetWriter::writeRow); parquetWriter.close(); @@ -137,7 +141,9 @@ private SerializationResult serializeFromJavaObjects( chunkEstimatedUncompressedSize, mergedData, chunkMinMaxInsertTimeInMs, - parquetWriter.getBlocksMetadata()); + parquetWriter.getBlocksMetadata(), + parquetWriter.getNdvStats(), + parquetWriter.getMaxLengthStats()); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 1394649d3..d462005d3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -290,6 +290,7 @@ public Flusher createFlusher() { return new ParquetFlusher( schema, clientBufferParameters.getMaxChunkSizeInBytes(), - clientBufferParameters.getBdecParquetCompression()); + clientBufferParameters.getBdecParquetCompression(), + this.clientBufferParameters.getIsIcebergMode()); } } diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 83394abf7..7d3a1b88b 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -70,6 +70,9 @@ public class Constants { public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; + public static final int PARQUET_MAJOR_VERSION = 1; + public static final int PARQUET_MINOR_VERSION = 0; + public enum WriteMode { CLOUD_STORAGE, REST_API, diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index fab65c05f..7e59e2f07 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -7,6 +7,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import net.snowflake.ingest.utils.Constants; @@ -38,6 +40,7 @@ public class BdecParquetWriter implements AutoCloseable { private final InternalParquetRecordWriter> writer; private final CodecFactory codecFactory; + private final BdecWriteSupport writeSupport; private long rowsWritten = 0; /** @@ -47,6 +50,7 @@ public class BdecParquetWriter implements AutoCloseable { * @param schema row schema * @param extraMetaData extra metadata * @param channelName name of the channel that is using the writer + * @param enableNdvAndMaxLengthStats whether to record NDV and MaxLength * @throws IOException */ public BdecParquetWriter( @@ -55,13 +59,14 @@ public BdecParquetWriter( Map extraMetaData, String channelName, long maxChunkSizeInBytes, - Constants.BdecParquetCompression bdecParquetCompression) + Constants.BdecParquetCompression bdecParquetCompression, + boolean enableNdvAndMaxLengthStats) throws IOException { OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes); ParquetProperties encodingProps = createParquetProperties(); Configuration conf = new Configuration(); - WriteSupport> writeSupport = - new BdecWriteSupport(schema, extraMetaData, channelName); + writeSupport = + new BdecWriteSupport(schema, extraMetaData, channelName, enableNdvAndMaxLengthStats); WriteSupport.WriteContext writeContext = writeSupport.init(conf); ParquetFileWriter fileWriter = @@ -117,6 +122,19 @@ public List getBlocksMetadata() { return writer.getFooter().getBlocks(); } + public Map getMaxLengthStats() { + return writeSupport.maxLengthStats; + } + + public Map getNdvStats() { + if (!writeSupport.enableNdvAndMaxLengthStats) { + return null; + } + Map ndvStats = new HashMap<>(); + writeSupport.ndvStatsSet.forEach((k, v) -> ndvStats.put(k, v.size())); + return ndvStats; + } + public void writeRow(List row) { try { writer.write(row); @@ -249,16 +267,28 @@ private static class BdecWriteSupport extends WriteSupport> { RecordConsumer recordConsumer; Map extraMetadata; private final String channelName; + boolean enableNdvAndMaxLengthStats; + Map> ndvStatsSet; + Map maxLengthStats; // TODO SNOW-672156: support specifying encodings and compression - BdecWriteSupport(MessageType schema, Map extraMetadata, String channelName) { + BdecWriteSupport( + MessageType schema, + Map extraMetadata, + String channelName, + boolean enableNdvAndMaxLengthStats) { this.schema = schema; this.extraMetadata = extraMetadata; this.channelName = channelName; + this.enableNdvAndMaxLengthStats = enableNdvAndMaxLengthStats; } @Override public WriteContext init(Configuration config) { + if (enableNdvAndMaxLengthStats) { + ndvStatsSet = new HashMap<>(); + maxLengthStats = new HashMap<>(); + } return new WriteContext(schema, extraMetadata); } @@ -284,20 +314,24 @@ public void write(List values) { + values); } recordConsumer.startMessage(); - writeValues(values, schema); + writeValues(values, schema, ""); recordConsumer.endMessage(); } - private void writeValues(List values, GroupType type) { + private void writeValues(List values, GroupType type, String path) { List cols = type.getFields(); for (int i = 0; i < cols.size(); ++i) { Object val = values.get(i); if (val != null) { String fieldName = cols.get(i).getName(); + String currentPath = !path.isEmpty() ? path + "." + fieldName : fieldName; recordConsumer.startField(fieldName, i); if (cols.get(i).isPrimitive()) { PrimitiveType.PrimitiveTypeName typeName = cols.get(i).asPrimitiveType().getPrimitiveTypeName(); + if (enableNdvAndMaxLengthStats) { + ndvStatsSet.computeIfAbsent(currentPath, k -> new HashSet<>()).add(val); + } switch (typeName) { case BOOLEAN: recordConsumer.addBoolean((boolean) val); @@ -320,6 +354,9 @@ private void writeValues(List values, GroupType type) { ? Binary.fromString((String) val) : Binary.fromConstantByteArray((byte[]) val); recordConsumer.addBinary(binVal); + if (enableNdvAndMaxLengthStats) { + maxLengthStats.merge(currentPath, binVal.length(), Math::max); + } break; case FIXED_LEN_BYTE_ARRAY: Binary binary = Binary.fromConstantByteArray((byte[]) val); @@ -334,13 +371,13 @@ private void writeValues(List values, GroupType type) { for (Object o : values) { recordConsumer.startGroup(); if (o != null) { - writeValues((List) o, cols.get(i).asGroupType()); + writeValues((List) o, cols.get(i).asGroupType(), currentPath); } recordConsumer.endGroup(); } } else { recordConsumer.startGroup(); - writeValues((List) val, cols.get(i).asGroupType()); + writeValues((List) val, cols.get(i).asGroupType(), currentPath); recordConsumer.endGroup(); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 781e53959..6877dbc3e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -69,7 +69,7 @@ private List> createChannelDataPerTable(int metada String columnName = "C1"; ChannelData channelData = Mockito.spy(new ChannelData<>()); MessageType schema = createSchema(columnName); - Mockito.doReturn(new ParquetFlusher(schema, 100L, Constants.BdecParquetCompression.GZIP)) + Mockito.doReturn(new ParquetFlusher(schema, 100L, Constants.BdecParquetCompression.GZIP, false)) .when(channelData) .createFlusher(); @@ -82,7 +82,8 @@ private List> createChannelDataPerTable(int metada new HashMap<>(), "CHANNEL", 1000, - Constants.BdecParquetCompression.GZIP); + Constants.BdecParquetCompression.GZIP, + false); bdecParquetWriter.writeRow(Collections.singletonList("1")); channelData.setVectors( new ParquetChunkData(