From ae10eee5bc0dabc925d4add674550026f3cf4a1a Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 15 Oct 2024 15:48:16 -0700 Subject: [PATCH] fix npe --- .../streaming/internal/AbstractRowBuffer.java | 9 +- .../streaming/internal/BlobBuilder.java | 31 +++- .../streaming/internal/ChunkMetadata.java | 15 +- .../internal/FileColumnProperties.java | 18 ++- .../ingest/streaming/internal/Flusher.java | 5 +- .../streaming/internal/ParquetFlusher.java | 11 +- .../streaming/internal/ParquetRowBuffer.java | 3 - .../streaming/internal/RowBufferStats.java | 8 +- .../net/snowflake/ingest/utils/Constants.java | 2 + .../net/snowflake/ingest/utils/Utils.java | 23 --- .../parquet/hadoop/BdecParquetReader.java | 4 +- ...riter.java => SnowflakeParquetWriter.java} | 37 ++++- .../streaming/internal/BlobBuilderTest.java | 8 +- .../streaming/internal/ChannelDataTest.java | 18 +-- .../internal/RowBufferStatsTest.java | 149 ++++-------------- .../datatypes/AbstractDataTypeTest.java | 37 +++-- .../datatypes/IcebergNumericTypesIT.java | 6 +- .../datatypes/IcebergStructuredIT.java | 18 ++- 18 files changed, 189 insertions(+), 213 deletions(-) rename src/main/java/org/apache/parquet/hadoop/{BdecParquetWriter.java => SnowflakeParquetWriter.java} (91%) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 8e16b734d..69558d5d9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -292,6 +292,9 @@ public InsertValidationResponse insertRows( // Temp stats map to use until all the rows are validated @VisibleForTesting Map tempStatsMap; + // Map of the column name to the column object, used for null/missing column check + protected final Map fieldIndex; + // Lock used to protect the buffers from concurrent read/write private final Lock flushLock; @@ -352,6 +355,8 @@ public InsertValidationResponse insertRows( // Initialize empty stats this.statsMap = new HashMap<>(); this.tempStatsMap = new HashMap<>(); + + this.fieldIndex = new HashMap<>(); } /** @@ -427,7 +432,7 @@ Set verifyInputColumns( List missingCols = new ArrayList<>(); for (String columnName : this.nonNullableFieldNames) { if (!inputColNamesMap.containsKey(columnName)) { - missingCols.add(statsMap.get(columnName).getColumnDisplayName()); + missingCols.add(fieldIndex.get(columnName).columnMetadata.getName()); } } @@ -447,7 +452,7 @@ Set verifyInputColumns( for (String columnName : this.nonNullableFieldNames) { if (inputColNamesMap.containsKey(columnName) && row.get(inputColNamesMap.get(columnName)) == null) { - nullValueNotNullCols.add(statsMap.get(columnName).getColumnDisplayName()); + nullValueNotNullCols.add(fieldIndex.get(columnName).columnMetadata.getName()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 73774b144..4fcc64ff2 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -31,8 +31,8 @@ import net.snowflake.ingest.utils.Cryptor; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; -import net.snowflake.ingest.utils.Utils; import org.apache.commons.codec.binary.Hex; +import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.hadoop.ParquetFileWriter; /** @@ -91,7 +91,8 @@ static Blob constructBlobAndMetadata( final byte[] compressedChunkData; final int chunkLength; final int compressedChunkDataSize; - int extendedMetadataSize = -1; + long extendedMetadataSize = -1; + long metadataSize = -1; if (internalParameterProvider.getEnableChunkEncryption()) { Pair paddedChunk = @@ -115,7 +116,8 @@ static Blob constructBlobAndMetadata( compressedChunkDataSize = chunkLength; if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { - extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength); + metadataSize = getExtendedMetadataSize(compressedChunkData); + extendedMetadataSize = serializedChunk.extendedMetadataSize; } } @@ -148,11 +150,12 @@ static Blob constructBlobAndMetadata( if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { chunkMetadataBuilder - .setMajorVersion(ParquetFileWriter.CURRENT_VERSION) + .setMajorVersion(Constants.PARQUET_MAJOR_VERSION) .setMinorVersion(Constants.PARQUET_MINOR_VERSION) // set createdOn in seconds .setCreatedOn(System.currentTimeMillis() / 1000) - .setExtendedMetadataSize((long) extendedMetadataSize); + .setMetadataSize(metadataSize) + .setExtendedMetadataSize(extendedMetadataSize); } ChunkMetadata chunkMetadata = chunkMetadataBuilder.build(); @@ -298,4 +301,22 @@ static class Blob { this.blobStats = blobStats; } } + + /** + * Get the metadata size (footer size) from a parquet file + * + * @param bytes the serialized parquet file + * @return the extended metadata size + */ + static long getExtendedMetadataSize(byte[] bytes) throws IOException { + final int magicOffset = bytes.length - ParquetFileWriter.MAGIC.length; + final int footerSizeOffset = magicOffset - Integer.BYTES; + if (footerSizeOffset < 0 + || !ParquetFileWriter.MAGIC_STR.equals( + new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) { + throw new IllegalArgumentException("Invalid parquet file"); + } + + return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset); + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java index 006782d25..0f27315f5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -26,6 +26,7 @@ class ChunkMetadata { private Integer majorVersion; private Integer minorVersion; private Long createdOn; + private Long metadataSize; private Long extendedMetadataSize; static Builder builder() { @@ -51,6 +52,7 @@ static class Builder { private Integer majorVersion; private Integer minorVersion; private Long createdOn; + private Long metadataSize; private Long extendedMetadataSize; Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) { @@ -124,6 +126,11 @@ Builder setCreatedOn(Long createdOn) { return this; } + Builder setMetadataSize(Long metadataSize) { + this.metadataSize = metadataSize; + return this; + } + Builder setExtendedMetadataSize(Long extendedMetadataSize) { this.extendedMetadataSize = extendedMetadataSize; return this; @@ -258,6 +265,12 @@ Long getCreatedOn() { return this.createdOn; } + @JsonProperty("metadata_size") + @JsonInclude(JsonInclude.Include.NON_NULL) + Long getMetadataSize() { + return this.metadataSize; + } + @JsonProperty("ext_metadata_size") @JsonInclude(JsonInclude.Include.NON_NULL) Long getExtendedMetadataSize() { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java index fcb758e31..1a22b5f31 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java @@ -45,7 +45,7 @@ class FileColumnProperties { private long nullCount; // for elements in repeated columns - private Long numberOfValues; + private long numberOfValues; // for binary or string columns private long maxLength; @@ -289,12 +289,12 @@ void setMaxStrNonCollated(String maxStrNonCollated) { } @JsonProperty("numberOfValues") - @JsonInclude(JsonInclude.Include.NON_DEFAULT) - Long getNumberOfValues() { + @JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = IgnoreMinusOneFilter.class) + long getNumberOfValues() { return numberOfValues; } - void setNumberOfValues(Long numberOfValues) { + void setNumberOfValues(long numberOfValues) { this.numberOfValues = numberOfValues; } @@ -360,4 +360,14 @@ public int hashCode() { nullCount, maxLength); } + + static class IgnoreMinusOneFilter { + @Override + public boolean equals(Object obj) { + if (obj instanceof Long) { + return (Long) obj == -1; + } + return false; + } + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java index 241defdfc..5a426e873 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java @@ -36,6 +36,7 @@ class SerializationResult { final float chunkEstimatedUncompressedSize; final ByteArrayOutputStream chunkData; final Pair chunkMinMaxInsertTimeInMs; + final long extendedMetadataSize; public SerializationResult( List channelsMetadataList, @@ -43,13 +44,15 @@ public SerializationResult( long rowCount, float chunkEstimatedUncompressedSize, ByteArrayOutputStream chunkData, - Pair chunkMinMaxInsertTimeInMs) { + Pair chunkMinMaxInsertTimeInMs, + long extendedMetadataSize) { this.channelsMetadataList = channelsMetadataList; this.columnEpStatsMapCombined = columnEpStatsMapCombined; this.rowCount = rowCount; this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize; this.chunkData = chunkData; this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs; + this.extendedMetadataSize = extendedMetadataSize; } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 5b11996ec..e7272d94a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -17,7 +17,7 @@ import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.hadoop.BdecParquetWriter; +import org.apache.parquet.hadoop.SnowflakeParquetWriter; import org.apache.parquet.schema.MessageType; /** @@ -66,7 +66,7 @@ private SerializationResult serializeFromJavaObjects( String firstChannelFullyQualifiedTableName = null; Map columnEpStatsMapCombined = null; List> rows = null; - BdecParquetWriter parquetWriter; + SnowflakeParquetWriter parquetWriter; ByteArrayOutputStream mergedData = new ByteArrayOutputStream(); Pair chunkMinMaxInsertTimeInMs = null; @@ -129,7 +129,7 @@ private SerializationResult serializeFromJavaObjects( // http://go/streams-on-replicated-mixed-tables metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); parquetWriter = - new BdecParquetWriter( + new SnowflakeParquetWriter( mergedData, schema, metadata, @@ -150,7 +150,8 @@ private SerializationResult serializeFromJavaObjects( rowCount, chunkEstimatedUncompressedSize, mergedData, - chunkMinMaxInsertTimeInMs); + chunkMinMaxInsertTimeInMs, + parquetWriter.getExtendedMetadataSize()); } /** @@ -164,7 +165,7 @@ private SerializationResult serializeFromJavaObjects( * Used only for logging purposes if there is a mismatch. */ private void verifyRowCounts( - BdecParquetWriter writer, + SnowflakeParquetWriter writer, long totalMetadataRowCount, List> channelsDataPerTable, long javaSerializationTotalRowCount) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index b972404f7..4e3afa7b7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -38,8 +38,6 @@ */ public class ParquetRowBuffer extends AbstractRowBuffer { - private final Map fieldIndex; - /* map that contains metadata like typeinfo for columns and other information needed by the server scanner */ private final Map metadata; @@ -72,7 +70,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer { clientBufferParameters, offsetTokenVerificationFunction, telemetryService); - this.fieldIndex = new HashMap<>(); this.metadata = new HashMap<>(); this.data = new ArrayList<>(); this.tempData = new ArrayList<>(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java index 131f603f0..d4e986cdb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.EP_NDV_UNKNOWN; +import static net.snowflake.ingest.utils.Constants.EP_NV_UNKNOWN; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -48,7 +49,7 @@ class RowBufferStats { private final boolean enableDistinctValuesCount; private Set distinctValues; private final boolean enableValuesCount; - private Long numberOfValues; + private long numberOfValues; RowBufferStats( String columnDisplayName, @@ -310,9 +311,8 @@ long getDistinctValues() { return enableDistinctValuesCount ? distinctValues.size() : EP_NDV_UNKNOWN; } - // TODO: change default to -1 after Oct 17 - Long getNumberOfValues() { - return enableValuesCount ? numberOfValues : null; + long getNumberOfValues() { + return enableValuesCount ? numberOfValues : EP_NV_UNKNOWN; } String getCollationDefinitionString() { diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 7198c7669..a5e04e21e 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -62,6 +62,7 @@ public class Constants { public static final int MAX_STREAMING_INGEST_API_CHANNEL_RETRY = 3; public static final int STREAMING_INGEST_TELEMETRY_UPLOAD_INTERVAL_IN_SEC = 10; public static final long EP_NDV_UNKNOWN = -1L; + public static final long EP_NV_UNKNOWN = -1L; public static final int MAX_OAUTH_REFRESH_TOKEN_RETRY = 3; public static final int BINARY_COLUMN_MAX_SIZE = 8 * 1024 * 1024; public static final int VARCHAR_COLUMN_MAX_SIZE = 16 * 1024 * 1024; @@ -72,6 +73,7 @@ public class Constants { public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; + public static final int PARQUET_MAJOR_VERSION = 1; public static final int PARQUET_MINOR_VERSION = 0; /** diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index becbfb46a..ff35d9022 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -8,7 +8,6 @@ import com.codahale.metrics.Timer; import io.netty.util.internal.PlatformDependent; -import java.io.IOException; import java.io.StringReader; import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; @@ -30,8 +29,6 @@ import java.util.Properties; import net.snowflake.client.core.SFSessionProperty; import org.apache.commons.codec.binary.Base64; -import org.apache.parquet.bytes.BytesUtils; -import org.apache.parquet.hadoop.ParquetFileWriter; import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; import org.bouncycastle.openssl.PEMParser; import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; @@ -433,24 +430,4 @@ public static String concatDotPath(String... path) { } return sb.toString(); } - - /** - * Get the extended metadata size (footer size) from a parquet file - * - * @param bytes the serialized parquet file - * @param length the length of the byte array without padding - * @return the extended metadata size - */ - public static int getExtendedMetadataSize(byte[] bytes, int length) throws IOException { - final int magicOffset = length - ParquetFileWriter.MAGIC.length; - final int footerSizeOffset = magicOffset - Integer.BYTES; - if (bytes.length < length - || footerSizeOffset < 0 - || !ParquetFileWriter.MAGIC_STR.equals( - new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) { - throw new IllegalArgumentException("Invalid parquet file"); - } - - return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset); - } } diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java index ef95fab14..86dae40af 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetReader.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package org.apache.parquet.hadoop; @@ -82,7 +82,7 @@ public void close() throws IOException { * @param data input data to be read first and then written with outputWriter * @param outputWriter output parquet writer */ - public static void readFileIntoWriter(byte[] data, BdecParquetWriter outputWriter) { + public static void readFileIntoWriter(byte[] data, SnowflakeParquetWriter outputWriter) { try (BdecParquetReader reader = new BdecParquetReader(data)) { for (List record = reader.read(); record != null; record = reader.read()) { outputWriter.writeRow(record); diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/SnowflakeParquetWriter.java similarity index 91% rename from src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java rename to src/main/java/org/apache/parquet/hadoop/SnowflakeParquetWriter.java index c3126847d..523ff94a1 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/SnowflakeParquetWriter.java @@ -19,6 +19,7 @@ import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.ParquetEncodingException; @@ -31,12 +32,13 @@ import org.apache.parquet.schema.Type; /** - * BDEC specific parquet writer. + * Snowflake specific parquet writer, supports BDEC file for FDN tables and parquet file for Iceberg + * tables. * *

Resides in parquet package because, it uses {@link InternalParquetRecordWriter} and {@link * CodecFactory} that are package private. */ -public class BdecParquetWriter implements AutoCloseable { +public class SnowflakeParquetWriter implements AutoCloseable { private final InternalParquetRecordWriter> writer; private final CodecFactory codecFactory; @@ -50,7 +52,7 @@ public class BdecParquetWriter implements AutoCloseable { private long rowsWritten = 0; /** - * Creates a BDEC specific parquet writer. + * Creates a Snowflake specific parquet writer. * * @param stream output * @param schema row schema @@ -60,7 +62,7 @@ public class BdecParquetWriter implements AutoCloseable { * exceeded we'll end up throwing * @throws IOException */ - public BdecParquetWriter( + public SnowflakeParquetWriter( ByteArrayOutputStream stream, MessageType schema, Map extraMetaData, @@ -78,7 +80,7 @@ public BdecParquetWriter( ParquetProperties encodingProps = createParquetProperties(); Configuration conf = new Configuration(); WriteSupport> writeSupport = - new BdecWriteSupport(schema, extraMetaData, channelName); + new SnowflakeWriteSupport(schema, extraMetaData, channelName); WriteSupport.WriteContext writeContext = writeSupport.init(conf); ParquetFileWriter fileWriter = @@ -138,6 +140,24 @@ public List getRowCountsFromFooter() { return blockRowCounts; } + /** @return extended metadata size (page index size + bloom filter size) */ + public long getExtendedMetadataSize() { + long extendedMetadataSize = 0; + for (BlockMetaData metadata : writer.getFooter().getBlocks()) { + for (ColumnChunkMetaData column : metadata.getColumns()) { + extendedMetadataSize += + (column.getColumnIndexReference() != null + ? column.getColumnIndexReference().getLength() + : 0) + + (column.getOffsetIndexReference() != null + ? column.getOffsetIndexReference().getLength() + : 0) + + column.getBloomFilterLength(); + } + } + return extendedMetadataSize; + } + public void writeRow(List row) { try { writer.write(row); @@ -263,16 +283,17 @@ public long getPos() { * *

This class is implemented as parquet library API requires, mostly to serialize user column * values depending on type into Parquet {@link RecordConsumer} in {@link - * BdecWriteSupport#write(List)}. + * SnowflakeWriteSupport#write(List)}. */ - private static class BdecWriteSupport extends WriteSupport> { + private static class SnowflakeWriteSupport extends WriteSupport> { MessageType schema; RecordConsumer recordConsumer; Map extraMetadata; private final String channelName; // TODO SNOW-672156: support specifying encodings and compression - BdecWriteSupport(MessageType schema, Map extraMetadata, String channelName) { + SnowflakeWriteSupport( + MessageType schema, Map extraMetadata, String channelName) { this.schema = schema; this.extraMetadata = extraMetadata; this.channelName = channelName; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 51aa1c289..ddc1fcfa2 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -15,7 +15,7 @@ import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.hadoop.BdecParquetWriter; +import org.apache.parquet.hadoop.SnowflakeParquetWriter; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -87,8 +87,8 @@ private List> createChannelDataPerTable(int metada channelData.setRowSequencer(1L); ByteArrayOutputStream stream = new ByteArrayOutputStream(); - BdecParquetWriter bdecParquetWriter = - new BdecParquetWriter( + SnowflakeParquetWriter snowflakeParquetWriter = + new SnowflakeParquetWriter( stream, schema, new HashMap<>(), @@ -100,7 +100,7 @@ private List> createChannelDataPerTable(int metada ? ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0, isIceberg); - bdecParquetWriter.writeRow(Collections.singletonList("1")); + snowflakeParquetWriter.writeRow(Collections.singletonList("1")); channelData.setVectors( new ParquetChunkData( Collections.singletonList(Collections.singletonList("A")), new HashMap<>())); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java index 52b719484..d76499c14 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelDataTest.java @@ -125,13 +125,8 @@ public void testGetCombinedColumnStatsMap() { Assert.assertNull(oneCombined.getCurrentMinRealValue()); Assert.assertNull(oneCombined.getCurrentMaxRealValue()); - if (enableNDVAndNV) { - Assert.assertEquals(5, oneCombined.getDistinctValues()); - Assert.assertEquals(5, oneCombined.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, oneCombined.getDistinctValues()); - Assert.assertNull(oneCombined.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getNumberOfValues()); Assert.assertArrayEquals( "10".getBytes(StandardCharsets.UTF_8), twoCombined.getCurrentMinStrValue()); @@ -142,12 +137,7 @@ public void testGetCombinedColumnStatsMap() { Assert.assertNull(twoCombined.getCurrentMinRealValue()); Assert.assertNull(twoCombined.getCurrentMaxRealValue()); - if (enableNDVAndNV) { - Assert.assertEquals(5, twoCombined.getDistinctValues()); - Assert.assertEquals(5, twoCombined.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, twoCombined.getDistinctValues()); - Assert.assertNull(twoCombined.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 5 : -1, oneCombined.getNumberOfValues()); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java index a9583a3f6..a00cb538e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java @@ -32,14 +32,8 @@ public void testEmptyState() throws Exception { Assert.assertNull(stats.getCurrentMinIntValue()); Assert.assertNull(stats.getCurrentMaxIntValue()); Assert.assertEquals(0, stats.getCurrentNullCount()); - - if (enableNDVAndNV) { - Assert.assertEquals(0, stats.getDistinctValues()); - Assert.assertEquals(0, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 0 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 0 : -1, stats.getNumberOfValues()); } @Test @@ -49,25 +43,15 @@ public void testMinMaxStrNonCol() throws Exception { stats.addStrValue("bob"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - if (enableNDVAndNV) { - Assert.assertEquals(1, stats.getDistinctValues()); - Assert.assertEquals(1, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addStrValue("charlie"); Assert.assertArrayEquals("bob".getBytes(StandardCharsets.UTF_8), stats.getCurrentMinStrValue()); Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - if (enableNDVAndNV) { - Assert.assertEquals(2, stats.getDistinctValues()); - Assert.assertEquals(2, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addStrValue("alice"); Assert.assertArrayEquals( @@ -75,13 +59,8 @@ public void testMinMaxStrNonCol() throws Exception { Assert.assertArrayEquals( "charlie".getBytes(StandardCharsets.UTF_8), stats.getCurrentMaxStrValue()); - if (enableNDVAndNV) { - Assert.assertEquals(3, stats.getDistinctValues()); - Assert.assertEquals(3, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinRealValue()); Assert.assertNull(stats.getCurrentMaxRealValue()); @@ -99,37 +78,22 @@ public void testMinMaxInt() throws Exception { Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMaxIntValue()); - if (enableNDVAndNV) { - Assert.assertEquals(1, stats.getDistinctValues()); - Assert.assertEquals(1, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addIntValue(BigInteger.valueOf(6)); Assert.assertEquals(BigInteger.valueOf((5)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - if (enableNDVAndNV) { - Assert.assertEquals(2, stats.getDistinctValues()); - Assert.assertEquals(2, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addIntValue(BigInteger.valueOf(4)); Assert.assertEquals(BigInteger.valueOf((4)), stats.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf((6)), stats.getCurrentMaxIntValue()); - if (enableNDVAndNV) { - Assert.assertEquals(3, stats.getDistinctValues()); - Assert.assertEquals(3, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinRealValue()); Assert.assertNull(stats.getCurrentMaxRealValue()); @@ -147,37 +111,22 @@ public void testMinMaxReal() throws Exception { Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMaxRealValue()); - if (enableNDVAndNV) { - Assert.assertEquals(1, stats.getDistinctValues()); - Assert.assertEquals(1, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 1 : -1, stats.getNumberOfValues()); stats.addRealValue(1.5); Assert.assertEquals(Double.valueOf(1), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - if (enableNDVAndNV) { - Assert.assertEquals(2, stats.getDistinctValues()); - Assert.assertEquals(2, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 2 : -1, stats.getNumberOfValues()); stats.addRealValue(.8); Assert.assertEquals(Double.valueOf(.8), stats.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(1.5), stats.getCurrentMaxRealValue()); - if (enableNDVAndNV) { - Assert.assertEquals(3, stats.getDistinctValues()); - Assert.assertEquals(3, stats.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, stats.getDistinctValues()); - Assert.assertNull(stats.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 3 : -1, stats.getNumberOfValues()); Assert.assertNull(stats.getCurrentMinIntValue()); Assert.assertNull(stats.getCurrentMaxIntValue()); @@ -230,14 +179,8 @@ public void testGetCombinedStats() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(1), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - - if (enableNDVAndNV) { - Assert.assertEquals(7, result.getDistinctValues()); - Assert.assertEquals(8, result.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, result.getDistinctValues()); - Assert.assertNull(result.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 8 : -1, result.getNumberOfValues()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -262,14 +205,8 @@ public void testGetCombinedStats() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(1), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - - if (enableNDVAndNV) { - Assert.assertEquals(7, result.getDistinctValues()); - Assert.assertEquals(8, result.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, result.getDistinctValues()); - Assert.assertNull(result.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 8 : -1, result.getNumberOfValues()); Assert.assertEquals(0, result.getCurrentNullCount()); Assert.assertNull(result.getCollationDefinitionString()); @@ -301,14 +238,8 @@ public void testGetCombinedStats() throws Exception { Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertEquals(5, result.getCurrentMaxLength()); - - if (enableNDVAndNV) { - Assert.assertEquals(7, result.getDistinctValues()); - Assert.assertEquals(8, result.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, result.getDistinctValues()); - Assert.assertNull(result.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 8 : -1, result.getNumberOfValues()); Assert.assertNull(result.getCurrentMinRealValue()); Assert.assertNull(result.getCurrentMaxRealValue()); @@ -332,14 +263,8 @@ public void testGetCombinedStatsNull() throws Exception { RowBufferStats result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(BigInteger.valueOf(2), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); - - if (enableNDVAndNV) { - Assert.assertEquals(4, result.getDistinctValues()); - Assert.assertEquals(4, result.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, result.getDistinctValues()); - Assert.assertNull(result.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getNumberOfValues()); Assert.assertEquals(2, result.getCurrentNullCount()); @@ -359,13 +284,8 @@ public void testGetCombinedStatsNull() throws Exception { result = RowBufferStats.getCombinedStats(one, two); Assert.assertEquals(Double.valueOf(2), result.getCurrentMinRealValue()); Assert.assertEquals(Double.valueOf(8), result.getCurrentMaxRealValue()); - if (enableNDVAndNV) { - Assert.assertEquals(4, result.getDistinctValues()); - Assert.assertEquals(4, result.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, result.getDistinctValues()); - Assert.assertNull(result.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getNumberOfValues()); Assert.assertEquals(0, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -388,13 +308,8 @@ public void testGetCombinedStatsNull() throws Exception { "alpha".getBytes(StandardCharsets.UTF_8), result.getCurrentMinStrValue()); Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); - if (enableNDVAndNV) { - Assert.assertEquals(4, result.getDistinctValues()); - Assert.assertEquals(4, result.getNumberOfValues().longValue()); - } else { - Assert.assertEquals(-1, result.getDistinctValues()); - Assert.assertNull(result.getNumberOfValues()); - } + Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); + Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getNumberOfValues()); Assert.assertEquals(1, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinRealValue()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 545a4827b..baa370e16 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.datatypes; import static net.snowflake.ingest.utils.Constants.ROLE; @@ -96,20 +100,23 @@ protected void setUp( conn.createStatement().execute(String.format("use database %s;", databaseName)); conn.createStatement().execute(String.format("use schema %s;", schemaName)); - switch (serializationPolicy) { - case COMPATIBLE: - conn.createStatement() - .execute( - String.format( - "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'COMPATIBLE';", - schemaName)); - break; - case OPTIMIZED: - conn.createStatement() - .execute( - String.format( - "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED';", schemaName)); - break; + if (isIceberg) { + switch (serializationPolicy) { + case COMPATIBLE: + conn.createStatement() + .execute( + String.format( + "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'COMPATIBLE';", + schemaName)); + break; + case OPTIMIZED: + conn.createStatement() + .execute( + String.format( + "alter schema %s set STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED';", + schemaName)); + break; + } } conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); @@ -163,7 +170,7 @@ protected String createTable(String dataType) throws SQLException { protected String createIcebergTable(String dataType) throws SQLException { String tableName = getRandomIdentifier(); String baseLocation = - String.format("%s/%s/%s", databaseName, dataType.replace(" ", "_"), tableName); + String.format("SDK_IT/%s/%s/%s", databaseName, dataType.replace(" ", "_"), tableName); conn.createStatement() .execute( String.format( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java index e4b0783d4..74eee26ed 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.datatypes; import java.math.BigDecimal; @@ -23,7 +27,7 @@ public static Object[][] parameters() { @Parameterized.Parameter public static String compressionAlgorithm; - @Parameterized.Parameter(1) + @Parameterized.Parameter(1)p public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; @Before diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index f99e82d30..ddf473dc5 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.datatypes; import com.fasterxml.jackson.databind.JsonNode; @@ -77,14 +81,20 @@ public void testStructuredDataType() throws Exception { .extracting(SFException::getVendorCode) .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); - /* Null struct, map list. TODO: SNOW-1727532 Should be fixed with null values EP calculation. */ + /* Null struct, map list. */ Assertions.assertThatThrownBy( () -> assertStructuredDataType("object(a int, b string, c boolean) not null", null)) - .isInstanceOf(NullPointerException.class); + .isInstanceOf(SFException.class) + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); Assertions.assertThatThrownBy(() -> assertStructuredDataType("map(string, int) not null", null)) - .isInstanceOf(NullPointerException.class); + .isInstanceOf(SFException.class) + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); Assertions.assertThatThrownBy(() -> assertStructuredDataType("array(int) not null", null)) - .isInstanceOf(NullPointerException.class); + .isInstanceOf(SFException.class) + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); /* Nested data types. Should be fixed. Fixed in server side. */ Assertions.assertThatThrownBy(