Skip to content

Commit

Permalink
Fix EP
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Sep 20, 2024
1 parent 601d7eb commit a29eca9
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,13 +641,15 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
* @param setDefaultValues: whether to set default values for null fields the EPs
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(long rowCount, Map<String, RowBufferStats> colStats) {
static EpInfo buildEpInfoFromStats(
long rowCount, Map<String, RowBufferStats> colStats, boolean setDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
FileColumnProperties dto = new FileColumnProperties(stat);
FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues);
String colName = colStat.getValue().getColumnDisplayName();
epInfo.getColumnEps().put(colName, dto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ class BlobBuilder {
* belongs to the same table. Will error if this is not the case
* @param bdecVersion version of blob
* @param encrypt If the output chunk is encrypted or not
* @param isIceberg If the streaming client is for Iceberg table or not
* @return {@link Blob} data
*/
static <T> Blob constructBlobAndMetadata(
String filePath,
List<List<ChannelData<T>>> blobData,
Constants.BdecVersion bdecVersion,
boolean encrypt)
boolean encrypt,
boolean isIceberg)
throws IOException, NoSuchPaddingException, NoSuchAlgorithmException,
InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException,
BadPaddingException {
Expand Down Expand Up @@ -133,9 +135,12 @@ static <T> Blob constructBlobAndMetadata(
.setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId())
.setEpInfo(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined))
serializedChunk.rowCount,
serializedChunk.columnEpStatsMapCombined,
!isIceberg))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond())
.setIsIceberg(isIceberg)
.build();

// Add chunk metadata and data to the list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Utils;

/** Metadata for a chunk that sends to Snowflake as part of the register blob request */
Expand All @@ -22,6 +24,8 @@ class ChunkMetadata {
private final Long encryptionKeyId;
private final Long firstInsertTimeInMs;
private final Long lastInsertTimeInMs;
private Integer parquetMajorVersion;
private Integer parquetMinorVersion;

static Builder builder() {
return new Builder();
Expand All @@ -43,6 +47,7 @@ static class Builder {
private Long encryptionKeyId;
private Long firstInsertTimeInMs;
private Long lastInsertTimeInMs;
private boolean isIceberg;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
this.dbName = channelFlushContext.getDbName();
Expand Down Expand Up @@ -100,6 +105,11 @@ Builder setLastInsertTimeInMs(Long lastInsertTimeInMs) {
return this;
}

Builder setIsIceberg(boolean isIceberg) {
this.isIceberg = isIceberg;
return this;
}

ChunkMetadata build() {
return new ChunkMetadata(this);
}
Expand Down Expand Up @@ -130,6 +140,11 @@ private ChunkMetadata(Builder builder) {
this.encryptionKeyId = builder.encryptionKeyId;
this.firstInsertTimeInMs = builder.firstInsertTimeInMs;
this.lastInsertTimeInMs = builder.lastInsertTimeInMs;

if (builder.isIceberg) {
this.parquetMajorVersion = Constants.PARQUET_MAJOR_VERSION;
this.parquetMinorVersion = Constants.PARQUET_MINOR_VERSION;
}
}

/**
Expand Down Expand Up @@ -200,4 +215,16 @@ Long getFirstInsertTimeInMs() {
Long getLastInsertTimeInMs() {
return this.lastInsertTimeInMs;
}

@JsonProperty("major_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Integer getMajorVersion() {
return this.parquetMajorVersion;
}

@JsonProperty("minor_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Integer getMinorVersion() {
return this.parquetMinorVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,27 @@ class FileColumnProperties {
public static final Double DEFAULT_MIN_MAX_REAL_VAL_FOR_EP = 0d;

FileColumnProperties(RowBufferStats stats) {
this(stats, true);
}

FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
this.setColumnOrdinal(stats.getOrdinal());
this.setCollation(stats.getCollationDefinitionString());
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
: stats.getCurrentMinIntValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMinRealValue());
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMaxRealValue());
this.setMaxLength(stats.getCurrentMaxLength());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ BlobMetadata buildAndUpload(
blobPath.fileName,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider().getEnableChunkEncryption());
this.owningClient.getInternalParameterProvider().getEnableChunkEncryption(),
this.owningClient.getInternalParameterProvider().getIsIcebergMode());

blob.blobStats.setBuildDurationMs(buildContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ boolean getEnableChunkEncryption() {
// mode does not need client-side encryption.
return !isIcebergMode;
}

boolean getIsIcebergMode() {
return isIcebergMode;
}
}
3 changes: 3 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public class Constants {
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public static final int PARQUET_MAJOR_VERSION = 1;
public static final int PARQUET_MINOR_VERSION = 0;

public enum WriteMode {
CLOUD_STORAGE,
REST_API,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public BdecParquetWriter(

/** @return List of row counts per block stored in the parquet footer */
public List<Long> getRowCountsFromFooter() {
if (writer.getFooter().getBlocks().size() > 1) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Expecting only one row group in the parquet file, but found "
+ writer.getFooter().getBlocks().size());
}
final List<Long> blockRowCounts = new ArrayList<>();
for (BlockMetaData metadata : writer.getFooter().getBlocks()) {
blockRowCounts.add(metadata.getRowCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ public void testSerializationErrors() throws Exception {
"a.bdec",
Collections.singletonList(createChannelDataPerTable(1)),
Constants.BdecVersion.THREE,
encrypt);
encrypt,
!encrypt);

// Construction fails if metadata contains 0 rows and data 1 row
try {
BlobBuilder.constructBlobAndMetadata(
"a.bdec",
Collections.singletonList(createChannelDataPerTable(0)),
Constants.BdecVersion.THREE,
encrypt);
encrypt,
!encrypt);
} catch (SFException e) {
Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode());
Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ public void testBuildAndUpload() throws Exception {

EpInfo expectedChunkEpInfo =
AbstractRowBuffer.buildEpInfoFromStats(
3, ChannelData.getCombinedColumnStatsMap(eps1, eps2));
3, ChannelData.getCombinedColumnStatsMap(eps1, eps2), !isIcebergMode);

ChannelMetadata expectedChannel1Metadata =
ChannelMetadata.builder()
Expand Down Expand Up @@ -1110,7 +1110,7 @@ public void testBlobBuilder() throws Exception {

stats1.addIntValue(new BigInteger("10"));
stats1.addIntValue(new BigInteger("15"));
EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1);
EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(2, eps1, !isIcebergMode);

ChannelMetadata channelMetadata =
ChannelMetadata.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public void testBuildEpInfoFromStats() {
colStats.put("intColumn", stats1);
colStats.put("strColumn", stats2);

EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats);
EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode);
Map<String, FileColumnProperties> columnResults = result.getColumnEps();
Assert.assertEquals(2, columnResults.keySet().size());

Expand Down Expand Up @@ -610,25 +610,29 @@ public void testBuildEpInfoFromNullColumnStats() {
colStats.put(intColName, stats1);
colStats.put(realColName, stats2);

EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats);
EpInfo result = AbstractRowBuffer.buildEpInfoFromStats(2, colStats, !isIcebergMode);
Map<String, FileColumnProperties> columnResults = result.getColumnEps();
Assert.assertEquals(2, columnResults.keySet().size());

FileColumnProperties intColumnResult = columnResults.get(intColName);
Assert.assertEquals(-1, intColumnResult.getDistinctValues());
Assert.assertEquals(
FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue());
isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP,
intColumnResult.getMinIntValue());
Assert.assertEquals(
FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMaxIntValue());
isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP,
intColumnResult.getMaxIntValue());
Assert.assertEquals(1, intColumnResult.getNullCount());
Assert.assertEquals(0, intColumnResult.getMaxLength());

FileColumnProperties realColumnResult = columnResults.get(realColName);
Assert.assertEquals(-1, intColumnResult.getDistinctValues());
Assert.assertEquals(
FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue());
isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP,
realColumnResult.getMinRealValue());
Assert.assertEquals(
FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMaxRealValue());
isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP,
realColumnResult.getMaxRealValue());
Assert.assertEquals(1, realColumnResult.getNullCount());
Assert.assertEquals(0, realColumnResult.getMaxLength());
}
Expand All @@ -651,7 +655,7 @@ public void testInvalidEPInfo() {
colStats.put("strColumn", stats2);

try {
AbstractRowBuffer.buildEpInfoFromStats(1, colStats);
AbstractRowBuffer.buildEpInfoFromStats(1, colStats, !isIcebergMode);
fail("should fail when row count is smaller than null count.");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception {

Map<String, RowBufferStats> columnEps = new HashMap<>();
columnEps.put("column", new RowBufferStats("COL1"));
EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps);
EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode);

ChunkMetadata chunkMetadata =
ChunkMetadata.builder()
Expand Down Expand Up @@ -616,7 +616,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception {
private Pair<List<BlobMetadata>, Set<ChunkRegisterStatus>> getRetryBlobMetadata() {
Map<String, RowBufferStats> columnEps = new HashMap<>();
columnEps.put("column", new RowBufferStats("COL1"));
EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps);
EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode);

ChannelMetadata channelMetadata1 =
ChannelMetadata.builder()
Expand Down

0 comments on commit a29eca9

Please sign in to comment.