diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 9172c4328..f6c26f9bd 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -641,13 +641,15 @@ public synchronized void close(String name) { * * @param rowCount: count of rows in the given buffer * @param colStats: map of column name to RowBufferStats + * @param setDefaultValues: whether to set default values for null fields the EPs * @return the EPs built from column stats */ - static EpInfo buildEpInfoFromStats(long rowCount, Map colStats) { + static EpInfo buildEpInfoFromStats( + long rowCount, Map colStats, boolean setDefaultValues) { EpInfo epInfo = new EpInfo(rowCount, new HashMap<>()); for (Map.Entry colStat : colStats.entrySet()) { RowBufferStats stat = colStat.getValue(); - FileColumnProperties dto = new FileColumnProperties(stat); + FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues); String colName = colStat.getValue().getColumnDisplayName(); epInfo.getColumnEps().put(colName, dto); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 939e6d255..7ad11dc3a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -61,14 +61,13 @@ class BlobBuilder { * @param blobData All the data for one blob. Assumes that all ChannelData in the inner List * belongs to the same table. Will error if this is not the case * @param bdecVersion version of blob - * @param encrypt If the output chunk is encrypted or not * @return {@link Blob} data */ static Blob constructBlobAndMetadata( String filePath, List>> blobData, Constants.BdecVersion bdecVersion, - boolean encrypt) + InternalParameterProvider internalParameterProvider) throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { @@ -91,7 +90,7 @@ static Blob constructBlobAndMetadata( final int chunkLength; final int compressedChunkDataSize; - if (encrypt) { + if (internalParameterProvider.getEnableChunkEncryption()) { Pair paddedChunk = padChunk(serializedChunk.chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); byte[] paddedChunkData = paddedChunk.getFirst(); @@ -133,9 +132,12 @@ static Blob constructBlobAndMetadata( .setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId()) .setEpInfo( AbstractRowBuffer.buildEpInfoFromStats( - serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined)) + serializedChunk.rowCount, + serializedChunk.columnEpStatsMapCombined, + internalParameterProvider.setDefaultValuesInEp())) .setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst()) .setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond()) + .setMajorMinorVersionInEp(internalParameterProvider.setMajorMinorVersionInEp()) .build(); // Add chunk metadata and data to the list @@ -147,7 +149,7 @@ static Blob constructBlobAndMetadata( logger.logInfo( "Finish building chunk in blob={}, table={}, rowCount={}, startOffset={}," + " estimatedUncompressedSize={}, chunkLength={}, compressedSize={}," - + " encryption={}, bdecVersion={}", + + " encrypt={}, bdecVersion={}", filePath, firstChannelFlushContext.getFullyQualifiedTableName(), serializedChunk.rowCount, @@ -155,7 +157,7 @@ static Blob constructBlobAndMetadata( serializedChunk.chunkEstimatedUncompressedSize, chunkLength, compressedChunkDataSize, - encrypt, + internalParameterProvider.getEnableChunkEncryption(), bdecVersion); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java index 7b42dbc5e..c0cb218ac 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java @@ -4,8 +4,10 @@ package net.snowflake.ingest.streaming.internal; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.Utils; /** Metadata for a chunk that sends to Snowflake as part of the register blob request */ @@ -22,6 +24,8 @@ class ChunkMetadata { private final Long encryptionKeyId; private final Long firstInsertTimeInMs; private final Long lastInsertTimeInMs; + private Integer parquetMajorVersion; + private Integer parquetMinorVersion; static Builder builder() { return new Builder(); @@ -43,6 +47,7 @@ static class Builder { private Long encryptionKeyId; private Long firstInsertTimeInMs; private Long lastInsertTimeInMs; + private boolean setMajorMinorVersionInEp; Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) { this.dbName = channelFlushContext.getDbName(); @@ -100,6 +105,11 @@ Builder setLastInsertTimeInMs(Long lastInsertTimeInMs) { return this; } + Builder setMajorMinorVersionInEp(boolean setMajorMinorVersionInEp) { + this.setMajorMinorVersionInEp = setMajorMinorVersionInEp; + return this; + } + ChunkMetadata build() { return new ChunkMetadata(this); } @@ -130,6 +140,11 @@ private ChunkMetadata(Builder builder) { this.encryptionKeyId = builder.encryptionKeyId; this.firstInsertTimeInMs = builder.firstInsertTimeInMs; this.lastInsertTimeInMs = builder.lastInsertTimeInMs; + + if (builder.setMajorMinorVersionInEp) { + this.parquetMajorVersion = Constants.PARQUET_MAJOR_VERSION; + this.parquetMinorVersion = Constants.PARQUET_MINOR_VERSION; + } } /** @@ -200,4 +215,18 @@ Long getFirstInsertTimeInMs() { Long getLastInsertTimeInMs() { return this.lastInsertTimeInMs; } + + // Snowflake service had a bug that did not allow the client to add new json fields in some + // contracts; thus these new fields have a NON_DEFAULT attribute. + @JsonProperty("major_vers") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + Integer getMajorVersion() { + return this.parquetMajorVersion; + } + + @JsonProperty("minor_vers") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + Integer getMinorVersion() { + return this.parquetMinorVersion; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 7e482679a..dfadd029a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal; +import java.util.Optional; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; @@ -18,6 +19,8 @@ public class ClientBufferParameters { private Constants.BdecParquetCompression bdecParquetCompression; + private final Optional maxRowGroups; + private boolean isIcebergMode; /** @@ -32,11 +35,13 @@ private ClientBufferParameters( long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic, + Optional maxRowGroups, boolean isIcebergMode) { this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; + this.maxRowGroups = maxRowGroups; this.isIcebergMode = isIcebergMode; } @@ -62,6 +67,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.isIcebergMode() : ParameterProvider.IS_ICEBERG_MODE_DEFAULT; + this.maxRowGroups = + isIcebergMode + ? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT) + : Optional.empty(); } /** @@ -75,12 +84,14 @@ public static ClientBufferParameters test_createClientBufferParameters( long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic, + Optional maxRowGroups, boolean isIcebergMode) { return new ClientBufferParameters( maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, enableNewJsonParsingLogic, + maxRowGroups, isIcebergMode); } @@ -103,4 +114,8 @@ public boolean isEnableNewJsonParsingLogic() { public boolean getIsIcebergMode() { return isIcebergMode; } + + public Optional getMaxRowGroups() { + return maxRowGroups; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java index a305a52f7..0ec671bf7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java @@ -44,24 +44,24 @@ class FileColumnProperties { // Default value to use for min/max real when all data in the given column is NULL public static final Double DEFAULT_MIN_MAX_REAL_VAL_FOR_EP = 0d; - FileColumnProperties(RowBufferStats stats) { + FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) { this.setColumnOrdinal(stats.getOrdinal()); this.setCollation(stats.getCollationDefinitionString()); this.setMaxIntValue( stats.getCurrentMaxIntValue() == null - ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP + ? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null) : stats.getCurrentMaxIntValue()); this.setMinIntValue( stats.getCurrentMinIntValue() == null - ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP + ? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null) : stats.getCurrentMinIntValue()); this.setMinRealValue( stats.getCurrentMinRealValue() == null - ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP + ? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null) : stats.getCurrentMinRealValue()); this.setMaxRealValue( stats.getCurrentMaxRealValue() == null - ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP + ? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null) : stats.getCurrentMaxRealValue()); this.setMaxLength(stats.getCurrentMaxLength()); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 75bbb6bd8..2e64f77b8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -597,13 +597,11 @@ BlobMetadata buildAndUpload( InvalidKeyException { Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency); + InternalParameterProvider paramProvider = this.owningClient.getInternalParameterProvider(); // Construct the blob along with the metadata of the blob BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata( - blobPath.fileName, - blobData, - bdecVersion, - this.owningClient.getInternalParameterProvider().getEnableChunkEncryption()); + blobPath.fileName, blobData, bdecVersion, paramProvider); blob.blobStats.setBuildDurationMs(buildContext); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java index 46a99b671..91fe3268e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -6,6 +6,8 @@ /** A class to provide non-configurable constants depends on Iceberg or non-Iceberg mode */ class InternalParameterProvider { + public static final Integer MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT = 1; + private final boolean isIcebergMode; InternalParameterProvider(boolean isIcebergMode) { @@ -17,4 +19,16 @@ boolean getEnableChunkEncryption() { // mode does not need client-side encryption. return !isIcebergMode; } + + boolean setDefaultValuesInEp() { + // When in Iceberg mode, we need to populate nulls (instead of zeroes) in the minIntValue / + // maxIntValue / minRealValue / maxRealValue fields of the EP Metadata. + return !isIcebergMode; + } + + boolean setMajorMinorVersionInEp() { + // When in Iceberg mode, we need to explicitly populate the major and minor version of parquet + // in the EP metadata. + return isIcebergMode; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index cfb60f94f..8865a88c3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; @@ -27,6 +28,7 @@ public class ParquetFlusher implements Flusher { private static final Logging logger = new Logging(ParquetFlusher.class); private final MessageType schema; private final long maxChunkSizeInBytes; + private final Optional maxRowGroups; private final Constants.BdecParquetCompression bdecParquetCompression; @@ -34,9 +36,11 @@ public class ParquetFlusher implements Flusher { public ParquetFlusher( MessageType schema, long maxChunkSizeInBytes, + Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; + this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; } @@ -126,6 +130,7 @@ private SerializationResult serializeFromJavaObjects( metadata, firstChannelFullyQualifiedTableName, maxChunkSizeInBytes, + maxRowGroups, bdecParquetCompression); rows.forEach(parquetWriter::writeRow); parquetWriter.close(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 395bea98f..b8054ad9f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -293,6 +293,7 @@ public Flusher createFlusher() { return new ParquetFlusher( schema, clientBufferParameters.getMaxChunkSizeInBytes(), + clientBufferParameters.getMaxRowGroups(), clientBufferParameters.getBdecParquetCompression()); } } diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 35eed3469..4504c1c01 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -71,6 +71,9 @@ public class Constants { public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; + public static final int PARQUET_MAJOR_VERSION = 1; + public static final int PARQUET_MINOR_VERSION = 0; + public enum WriteMode { CLOUD_STORAGE, REST_API, diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index eb5c20f03..45c65a4ea 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; @@ -39,6 +40,10 @@ public class BdecParquetWriter implements AutoCloseable { private final InternalParquetRecordWriter> writer; private final CodecFactory codecFactory; + + // Optional cap on the max number of row groups to allow per file, if this is exceeded we'll end + // up throwing + private final Optional maxRowGroups; private long rowsWritten = 0; /** @@ -48,6 +53,8 @@ public class BdecParquetWriter implements AutoCloseable { * @param schema row schema * @param extraMetaData extra metadata * @param channelName name of the channel that is using the writer + * @param maxRowGroups Optional cap on the max number of row groups to allow per file, if this is + * exceeded we'll end up throwing * @throws IOException */ public BdecParquetWriter( @@ -56,9 +63,11 @@ public BdecParquetWriter( Map extraMetaData, String channelName, long maxChunkSizeInBytes, + Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression) throws IOException { OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes); + this.maxRowGroups = maxRowGroups; ParquetProperties encodingProps = createParquetProperties(); Configuration conf = new Configuration(); WriteSupport> writeSupport = @@ -107,6 +116,14 @@ public BdecParquetWriter( /** @return List of row counts per block stored in the parquet footer */ public List getRowCountsFromFooter() { + if (maxRowGroups.isPresent() && writer.getFooter().getBlocks().size() > maxRowGroups.get()) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format( + "Expecting only %d row group in the parquet file, but found %d", + maxRowGroups.get(), writer.getFooter().getBlocks().size())); + } + final List blockRowCounts = new ArrayList<>(); for (BlockMetaData metadata : writer.getFooter().getBlocks()) { blockRowCounts.add(metadata.getRowCount()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 1cc3a2953..fff1fe53e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -9,6 +9,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Optional; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Pair; @@ -23,12 +24,12 @@ @RunWith(Parameterized.class) public class BlobBuilderTest { - @Parameterized.Parameters(name = "encrypt: {0}") - public static Object[] encrypt() { + @Parameterized.Parameters(name = "isIceberg: {0}") + public static Object[] isIceberg() { return new Object[] {false, true}; } - @Parameterized.Parameter public boolean encrypt; + @Parameterized.Parameter public boolean isIceberg; @Test public void testSerializationErrors() throws Exception { @@ -37,7 +38,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, - encrypt); + new InternalParameterProvider(isIceberg)); // Construction fails if metadata contains 0 rows and data 1 row try { @@ -45,7 +46,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(0)), Constants.BdecVersion.THREE, - encrypt); + new InternalParameterProvider(isIceberg)); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1")); @@ -67,7 +68,12 @@ private List> createChannelDataPerTable(int metada String columnName = "C1"; ChannelData channelData = Mockito.spy(new ChannelData<>()); MessageType schema = createSchema(columnName); - Mockito.doReturn(new ParquetFlusher(schema, 100L, Constants.BdecParquetCompression.GZIP)) + Mockito.doReturn( + new ParquetFlusher( + schema, + 100L, + isIceberg ? Optional.of(1) : Optional.empty(), + Constants.BdecParquetCompression.GZIP)) .when(channelData) .createFlusher(); @@ -80,6 +86,7 @@ private List> createChannelDataPerTable(int metada new HashMap<>(), "CHANNEL", 1000, + isIceberg ? Optional.of(1) : Optional.empty(), Constants.BdecParquetCompression.GZIP); bdecParquetWriter.writeRow(Collections.singletonList("1")); channelData.setVectors( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java index 11e128ef7..63ba51abb 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java @@ -2,8 +2,15 @@ import org.junit.Assert; import org.junit.Test; +import org.junit.runners.Parameterized; public class FileColumnPropertiesTest { + @Parameterized.Parameters(name = "isIceberg: {0}") + public static Object[] isIceberg() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIceberg; @Test public void testFileColumnPropertiesConstructor() { @@ -11,7 +18,7 @@ public void testFileColumnPropertiesConstructor() { RowBufferStats stats = new RowBufferStats("COL", null, 1); stats.addStrValue("bcd"); stats.addStrValue("abcde"); - FileColumnProperties props = new FileColumnProperties(stats); + FileColumnProperties props = new FileColumnProperties(stats, isIceberg); Assert.assertEquals(1, props.getColumnOrdinal()); Assert.assertEquals("6162636465", props.getMinStrValue()); Assert.assertNull(props.getMinStrNonCollated()); @@ -22,7 +29,7 @@ public void testFileColumnPropertiesConstructor() { stats = new RowBufferStats("COL", null, 1); stats.addStrValue("aßßßßßßßßßßßßßßßß"); Assert.assertEquals(33, stats.getCurrentMinStrValue().length); - props = new FileColumnProperties(stats); + props = new FileColumnProperties(stats, isIceberg); Assert.assertEquals(1, props.getColumnOrdinal()); Assert.assertNull(props.getMinStrNonCollated()); Assert.assertNull(props.getMaxStrNonCollated()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 2d1bd9646..b46f8c800 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -899,7 +899,7 @@ public void testBuildAndUpload() throws Exception { EpInfo expectedChunkEpInfo = AbstractRowBuffer.buildEpInfoFromStats( - 3, ChannelData.getCombinedColumnStatsMap(eps1, eps2)); + 3, ChannelData.getCombinedColumnStatsMap(eps1, eps2), !isIcebergMode); ChannelMetadata expectedChannel1Metadata = ChannelMetadata.builder() @@ -1110,7 +1110,7 @@ public void testBlobBuilder() throws Exception { stats1.addIntValue(new BigInteger("10")); stats1.addIntValue(new BigInteger("15")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1, !isIcebergMode); ChannelMetadata channelMetadata = ChannelMetadata.builder() diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 70b950fda..393750e25 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -17,6 +17,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -150,6 +151,7 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, Constants.BdecParquetCompression.GZIP, ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, + isIcebergMode ? Optional.of(1) : Optional.empty(), isIcebergMode), null, null); @@ -575,7 +577,7 @@ public void testBuildEpInfoFromStats() { colStats.put("intColumn", stats1); colStats.put("strColumn", stats2); - EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats); + EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode); Map columnResults = result.getColumnEps(); Assert.assertEquals(2, columnResults.keySet().size()); @@ -610,25 +612,29 @@ public void testBuildEpInfoFromNullColumnStats() { colStats.put(intColName, stats1); colStats.put(realColName, stats2); - EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats); + EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode); Map columnResults = result.getColumnEps(); Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties intColumnResult = columnResults.get(intColName); Assert.assertEquals(-1, intColumnResult.getDistinctValues()); Assert.assertEquals( - FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue()); + isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, + intColumnResult.getMinIntValue()); Assert.assertEquals( - FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMaxIntValue()); + isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, + intColumnResult.getMaxIntValue()); Assert.assertEquals(1, intColumnResult.getNullCount()); Assert.assertEquals(0, intColumnResult.getMaxLength()); FileColumnProperties realColumnResult = columnResults.get(realColName); Assert.assertEquals(-1, intColumnResult.getDistinctValues()); Assert.assertEquals( - FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue()); + isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, + realColumnResult.getMinRealValue()); Assert.assertEquals( - FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMaxRealValue()); + isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, + realColumnResult.getMaxRealValue()); Assert.assertEquals(1, realColumnResult.getNullCount()); Assert.assertEquals(0, realColumnResult.getMaxLength()); } @@ -651,7 +657,7 @@ public void testInvalidEPInfo() { colStats.put("strColumn", stats2); try { - AbstractRowBuffer.buildEpInfoFromStats(1, colStats); + AbstractRowBuffer.buildEpInfoFromStats(1, colStats, !isIcebergMode); fail("should fail when row count is smaller than null count."); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index c9ca86b35..1ce310706 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -567,7 +567,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { Map columnEps = new HashMap<>(); columnEps.put("column", new RowBufferStats("COL1")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode); ChunkMetadata chunkMetadata = ChunkMetadata.builder() @@ -616,7 +616,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { private Pair, Set> getRetryBlobMetadata() { Map columnEps = new HashMap<>(); columnEps.put("column", new RowBufferStats("COL1")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode); ChannelMetadata channelMetadata1 = ChannelMetadata.builder()