diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 9172c4328..f6c26f9bd 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -641,13 +641,15 @@ public synchronized void close(String name) { * * @param rowCount: count of rows in the given buffer * @param colStats: map of column name to RowBufferStats + * @param setDefaultValues: whether to set default values for null fields the EPs * @return the EPs built from column stats */ - static EpInfo buildEpInfoFromStats(long rowCount, Map colStats) { + static EpInfo buildEpInfoFromStats( + long rowCount, Map colStats, boolean setDefaultValues) { EpInfo epInfo = new EpInfo(rowCount, new HashMap<>()); for (Map.Entry colStat : colStats.entrySet()) { RowBufferStats stat = colStat.getValue(); - FileColumnProperties dto = new FileColumnProperties(stat); + FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues); String colName = colStat.getValue().getColumnDisplayName(); epInfo.getColumnEps().put(colName, dto); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 939e6d255..4bc3be4e8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -62,13 +62,15 @@ class BlobBuilder { * belongs to the same table. Will error if this is not the case * @param bdecVersion version of blob * @param encrypt If the output chunk is encrypted or not + * @param isIceberg If the streaming client is for Iceberg table or not * @return {@link Blob} data */ static Blob constructBlobAndMetadata( String filePath, List>> blobData, Constants.BdecVersion bdecVersion, - boolean encrypt) + boolean encrypt, + boolean isIceberg) throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { @@ -133,9 +135,12 @@ static Blob constructBlobAndMetadata( .setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId()) .setEpInfo( AbstractRowBuffer.buildEpInfoFromStats( - serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined)) + serializedChunk.rowCount, + serializedChunk.columnEpStatsMapCombined, + !isIceberg)) .setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst()) .setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond()) + .setIsIceberg(isIceberg) .build(); // Add chunk metadata and data to the list diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java index 7b42dbc5e..38b58a160 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChunkMetadata.java @@ -4,8 +4,10 @@ package net.snowflake.ingest.streaming.internal; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.Utils; /** Metadata for a chunk that sends to Snowflake as part of the register blob request */ @@ -22,6 +24,8 @@ class ChunkMetadata { private final Long encryptionKeyId; private final Long firstInsertTimeInMs; private final Long lastInsertTimeInMs; + private Integer parquetMajorVersion; + private Integer parquetMinorVersion; static Builder builder() { return new Builder(); @@ -43,6 +47,7 @@ static class Builder { private Long encryptionKeyId; private Long firstInsertTimeInMs; private Long lastInsertTimeInMs; + private boolean isIceberg; Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) { this.dbName = channelFlushContext.getDbName(); @@ -100,6 +105,11 @@ Builder setLastInsertTimeInMs(Long lastInsertTimeInMs) { return this; } + Builder setIsIceberg(boolean isIceberg) { + this.isIceberg = isIceberg; + return this; + } + ChunkMetadata build() { return new ChunkMetadata(this); } @@ -130,6 +140,11 @@ private ChunkMetadata(Builder builder) { this.encryptionKeyId = builder.encryptionKeyId; this.firstInsertTimeInMs = builder.firstInsertTimeInMs; this.lastInsertTimeInMs = builder.lastInsertTimeInMs; + + if (builder.isIceberg) { + this.parquetMajorVersion = Constants.PARQUET_MAJOR_VERSION; + this.parquetMinorVersion = Constants.PARQUET_MINOR_VERSION; + } } /** @@ -200,4 +215,16 @@ Long getFirstInsertTimeInMs() { Long getLastInsertTimeInMs() { return this.lastInsertTimeInMs; } + + @JsonProperty("major_vers") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + Integer getMajorVersion() { + return this.parquetMajorVersion; + } + + @JsonProperty("minor_vers") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + Integer getMinorVersion() { + return this.parquetMinorVersion; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java index a305a52f7..4e06476cd 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java @@ -45,23 +45,27 @@ class FileColumnProperties { public static final Double DEFAULT_MIN_MAX_REAL_VAL_FOR_EP = 0d; FileColumnProperties(RowBufferStats stats) { + this(stats, true); + } + + FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) { this.setColumnOrdinal(stats.getOrdinal()); this.setCollation(stats.getCollationDefinitionString()); this.setMaxIntValue( stats.getCurrentMaxIntValue() == null - ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP + ? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null) : stats.getCurrentMaxIntValue()); this.setMinIntValue( stats.getCurrentMinIntValue() == null - ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP + ? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null) : stats.getCurrentMinIntValue()); this.setMinRealValue( stats.getCurrentMinRealValue() == null - ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP + ? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null) : stats.getCurrentMinRealValue()); this.setMaxRealValue( stats.getCurrentMaxRealValue() == null - ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP + ? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null) : stats.getCurrentMaxRealValue()); this.setMaxLength(stats.getCurrentMaxLength()); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 75bbb6bd8..208fd55fb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -603,7 +603,8 @@ BlobMetadata buildAndUpload( blobPath.fileName, blobData, bdecVersion, - this.owningClient.getInternalParameterProvider().getEnableChunkEncryption()); + this.owningClient.getInternalParameterProvider().getEnableChunkEncryption(), + this.owningClient.getInternalParameterProvider().getIsIcebergMode()); blob.blobStats.setBuildDurationMs(buildContext); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java index 46a99b671..587877a5b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -17,4 +17,8 @@ boolean getEnableChunkEncryption() { // mode does not need client-side encryption. return !isIcebergMode; } + + boolean getIsIcebergMode() { + return isIcebergMode; + } } diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 35eed3469..4504c1c01 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -71,6 +71,9 @@ public class Constants { public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/"; public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/"; + public static final int PARQUET_MAJOR_VERSION = 1; + public static final int PARQUET_MINOR_VERSION = 0; + public enum WriteMode { CLOUD_STORAGE, REST_API, diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index eb5c20f03..5d6b75254 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -107,6 +107,12 @@ public BdecParquetWriter( /** @return List of row counts per block stored in the parquet footer */ public List getRowCountsFromFooter() { + if (writer.getFooter().getBlocks().size() > 1) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + "Expecting only one row group in the parquet file, but found " + + writer.getFooter().getBlocks().size()); + } final List blockRowCounts = new ArrayList<>(); for (BlockMetaData metadata : writer.getFooter().getBlocks()) { blockRowCounts.add(metadata.getRowCount()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 1cc3a2953..781e53959 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -37,7 +37,8 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, - encrypt); + encrypt, + !encrypt); // Construction fails if metadata contains 0 rows and data 1 row try { @@ -45,7 +46,8 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(0)), Constants.BdecVersion.THREE, - encrypt); + encrypt, + !encrypt); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1")); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 2d1bd9646..b46f8c800 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -899,7 +899,7 @@ public void testBuildAndUpload() throws Exception { EpInfo expectedChunkEpInfo = AbstractRowBuffer.buildEpInfoFromStats( - 3, ChannelData.getCombinedColumnStatsMap(eps1, eps2)); + 3, ChannelData.getCombinedColumnStatsMap(eps1, eps2), !isIcebergMode); ChannelMetadata expectedChannel1Metadata = ChannelMetadata.builder() @@ -1110,7 +1110,7 @@ public void testBlobBuilder() throws Exception { stats1.addIntValue(new BigInteger("10")); stats1.addIntValue(new BigInteger("15")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1, !isIcebergMode); ChannelMetadata channelMetadata = ChannelMetadata.builder() diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 70b950fda..780952fe8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -575,7 +575,7 @@ public void testBuildEpInfoFromStats() { colStats.put("intColumn", stats1); colStats.put("strColumn", stats2); - EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats); + EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode); Map columnResults = result.getColumnEps(); Assert.assertEquals(2, columnResults.keySet().size()); @@ -610,25 +610,29 @@ public void testBuildEpInfoFromNullColumnStats() { colStats.put(intColName, stats1); colStats.put(realColName, stats2); - EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats); + EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode); Map columnResults = result.getColumnEps(); Assert.assertEquals(2, columnResults.keySet().size()); FileColumnProperties intColumnResult = columnResults.get(intColName); Assert.assertEquals(-1, intColumnResult.getDistinctValues()); Assert.assertEquals( - FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue()); + isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, + intColumnResult.getMinIntValue()); Assert.assertEquals( - FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMaxIntValue()); + isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, + intColumnResult.getMaxIntValue()); Assert.assertEquals(1, intColumnResult.getNullCount()); Assert.assertEquals(0, intColumnResult.getMaxLength()); FileColumnProperties realColumnResult = columnResults.get(realColName); Assert.assertEquals(-1, intColumnResult.getDistinctValues()); Assert.assertEquals( - FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue()); + isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, + realColumnResult.getMinRealValue()); Assert.assertEquals( - FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMaxRealValue()); + isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, + realColumnResult.getMaxRealValue()); Assert.assertEquals(1, realColumnResult.getNullCount()); Assert.assertEquals(0, realColumnResult.getMaxLength()); } @@ -651,7 +655,7 @@ public void testInvalidEPInfo() { colStats.put("strColumn", stats2); try { - AbstractRowBuffer.buildEpInfoFromStats(1, colStats); + AbstractRowBuffer.buildEpInfoFromStats(1, colStats, !isIcebergMode); fail("should fail when row count is smaller than null count."); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index c9ca86b35..1ce310706 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -567,7 +567,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { Map columnEps = new HashMap<>(); columnEps.put("column", new RowBufferStats("COL1")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode); ChunkMetadata chunkMetadata = ChunkMetadata.builder() @@ -616,7 +616,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { private Pair, Set> getRetryBlobMetadata() { Map columnEps = new HashMap<>(); columnEps.put("column", new RowBufferStats("COL1")); - EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps); + EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode); ChannelMetadata channelMetadata1 = ChannelMetadata.builder()