Skip to content

Commit

Permalink
SNOW-1673203 EP info generation for new table format (#836)
Browse files Browse the repository at this point in the history
1. Allow EP information to include parquet major version / minor version
2. Allow EP information to have nulls in minIntValue / maxIntValue / minRealValue / maxRealValue instead of zeroes
3. Add validation for max row group count and throw if > 1

All three of these are selectively enabled only when isIcebergMode is true.
---------

Co-authored-by: Hitesh Madan <[email protected]>
  • Loading branch information
sfc-gh-alhuang and sfc-gh-hmadan authored Sep 23, 2024
1 parent 601d7eb commit 3f9b959
Show file tree
Hide file tree
Showing 16 changed files with 142 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,13 +641,15 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
* @param setDefaultValues: whether to set default values for null fields the EPs
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(long rowCount, Map<String, RowBufferStats> colStats) {
static EpInfo buildEpInfoFromStats(
long rowCount, Map<String, RowBufferStats> colStats, boolean setDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
FileColumnProperties dto = new FileColumnProperties(stat);
FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues);
String colName = colStat.getValue().getColumnDisplayName();
epInfo.getColumnEps().put(colName, dto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@ class BlobBuilder {
* @param blobData All the data for one blob. Assumes that all ChannelData in the inner List
* belongs to the same table. Will error if this is not the case
* @param bdecVersion version of blob
* @param encrypt If the output chunk is encrypted or not
* @return {@link Blob} data
*/
static <T> Blob constructBlobAndMetadata(
String filePath,
List<List<ChannelData<T>>> blobData,
Constants.BdecVersion bdecVersion,
boolean encrypt)
InternalParameterProvider internalParameterProvider)
throws IOException, NoSuchPaddingException, NoSuchAlgorithmException,
InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException,
BadPaddingException {
Expand All @@ -91,7 +90,7 @@ static <T> Blob constructBlobAndMetadata(
final int chunkLength;
final int compressedChunkDataSize;

if (encrypt) {
if (internalParameterProvider.getEnableChunkEncryption()) {
Pair<byte[], Integer> paddedChunk =
padChunk(serializedChunk.chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES);
byte[] paddedChunkData = paddedChunk.getFirst();
Expand Down Expand Up @@ -133,9 +132,12 @@ static <T> Blob constructBlobAndMetadata(
.setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId())
.setEpInfo(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined))
serializedChunk.rowCount,
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setDefaultValuesInEp()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond())
.setMajorMinorVersionInEp(internalParameterProvider.setMajorMinorVersionInEp())
.build();

// Add chunk metadata and data to the list
Expand All @@ -147,15 +149,15 @@ static <T> Blob constructBlobAndMetadata(
logger.logInfo(
"Finish building chunk in blob={}, table={}, rowCount={}, startOffset={},"
+ " estimatedUncompressedSize={}, chunkLength={}, compressedSize={},"
+ " encryption={}, bdecVersion={}",
+ " encrypt={}, bdecVersion={}",
filePath,
firstChannelFlushContext.getFullyQualifiedTableName(),
serializedChunk.rowCount,
startOffset,
serializedChunk.chunkEstimatedUncompressedSize,
chunkLength,
compressedChunkDataSize,
encrypt,
internalParameterProvider.getEnableChunkEncryption(),
bdecVersion);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Utils;

/** Metadata for a chunk that sends to Snowflake as part of the register blob request */
Expand All @@ -22,6 +24,8 @@ class ChunkMetadata {
private final Long encryptionKeyId;
private final Long firstInsertTimeInMs;
private final Long lastInsertTimeInMs;
private Integer parquetMajorVersion;
private Integer parquetMinorVersion;

static Builder builder() {
return new Builder();
Expand All @@ -43,6 +47,7 @@ static class Builder {
private Long encryptionKeyId;
private Long firstInsertTimeInMs;
private Long lastInsertTimeInMs;
private boolean setMajorMinorVersionInEp;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
this.dbName = channelFlushContext.getDbName();
Expand Down Expand Up @@ -100,6 +105,11 @@ Builder setLastInsertTimeInMs(Long lastInsertTimeInMs) {
return this;
}

Builder setMajorMinorVersionInEp(boolean setMajorMinorVersionInEp) {
this.setMajorMinorVersionInEp = setMajorMinorVersionInEp;
return this;
}

ChunkMetadata build() {
return new ChunkMetadata(this);
}
Expand Down Expand Up @@ -130,6 +140,11 @@ private ChunkMetadata(Builder builder) {
this.encryptionKeyId = builder.encryptionKeyId;
this.firstInsertTimeInMs = builder.firstInsertTimeInMs;
this.lastInsertTimeInMs = builder.lastInsertTimeInMs;

if (builder.setMajorMinorVersionInEp) {
this.parquetMajorVersion = Constants.PARQUET_MAJOR_VERSION;
this.parquetMinorVersion = Constants.PARQUET_MINOR_VERSION;
}
}

/**
Expand Down Expand Up @@ -200,4 +215,18 @@ Long getFirstInsertTimeInMs() {
Long getLastInsertTimeInMs() {
return this.lastInsertTimeInMs;
}

// Snowflake service had a bug that did not allow the client to add new json fields in some
// contracts; thus these new fields have a NON_DEFAULT attribute.
@JsonProperty("major_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Integer getMajorVersion() {
return this.parquetMajorVersion;
}

@JsonProperty("minor_vers")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Integer getMinorVersion() {
return this.parquetMinorVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package net.snowflake.ingest.streaming.internal;

import java.util.Optional;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;

Expand All @@ -18,6 +19,8 @@ public class ClientBufferParameters {

private Constants.BdecParquetCompression bdecParquetCompression;

private final Optional<Integer> maxRowGroups;

private boolean isIcebergMode;

/**
Expand All @@ -32,11 +35,13 @@ private ClientBufferParameters(
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.maxRowGroups = maxRowGroups;
this.isIcebergMode = isIcebergMode;
}

Expand All @@ -62,6 +67,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
this.maxRowGroups =
isIcebergMode
? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT)
: Optional.empty();
}

/**
Expand All @@ -75,12 +84,14 @@ public static ClientBufferParameters test_createClientBufferParameters(
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode) {
return new ClientBufferParameters(
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic,
maxRowGroups,
isIcebergMode);
}

Expand All @@ -103,4 +114,8 @@ public boolean isEnableNewJsonParsingLogic() {
public boolean getIsIcebergMode() {
return isIcebergMode;
}

public Optional<Integer> getMaxRowGroups() {
return maxRowGroups;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ class FileColumnProperties {
// Default value to use for min/max real when all data in the given column is NULL
public static final Double DEFAULT_MIN_MAX_REAL_VAL_FOR_EP = 0d;

FileColumnProperties(RowBufferStats stats) {
FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
this.setColumnOrdinal(stats.getOrdinal());
this.setCollation(stats.getCollationDefinitionString());
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
: stats.getCurrentMinIntValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMinRealValue());
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMaxRealValue());
this.setMaxLength(stats.getCurrentMaxLength());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,13 +597,11 @@ BlobMetadata buildAndUpload(
InvalidKeyException {
Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency);

InternalParameterProvider paramProvider = this.owningClient.getInternalParameterProvider();
// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath.fileName,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider().getEnableChunkEncryption());
blobPath.fileName, blobData, bdecVersion, paramProvider);

blob.blobStats.setBuildDurationMs(buildContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

/** A class to provide non-configurable constants depends on Iceberg or non-Iceberg mode */
class InternalParameterProvider {
public static final Integer MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT = 1;

private final boolean isIcebergMode;

InternalParameterProvider(boolean isIcebergMode) {
Expand All @@ -17,4 +19,16 @@ boolean getEnableChunkEncryption() {
// mode does not need client-side encryption.
return !isIcebergMode;
}

boolean setDefaultValuesInEp() {
// When in Iceberg mode, we need to populate nulls (instead of zeroes) in the minIntValue /
// maxIntValue / minRealValue / maxRealValue fields of the EP Metadata.
return !isIcebergMode;
}

boolean setMajorMinorVersionInEp() {
// When in Iceberg mode, we need to explicitly populate the major and minor version of parquet
// in the EP metadata.
return isIcebergMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
Expand All @@ -27,16 +28,19 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private static final Logging logger = new Logging(ParquetFlusher.class);
private final MessageType schema;
private final long maxChunkSizeInBytes;
private final Optional<Integer> maxRowGroups;

private final Constants.BdecParquetCompression bdecParquetCompression;

/** Construct parquet flusher from its schema. */
public ParquetFlusher(
MessageType schema,
long maxChunkSizeInBytes,
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
}

Expand Down Expand Up @@ -126,6 +130,7 @@ private SerializationResult serializeFromJavaObjects(
metadata,
firstChannelFullyQualifiedTableName,
maxChunkSizeInBytes,
maxRowGroups,
bdecParquetCompression);
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ public Flusher<ParquetChunkData> createFlusher() {
return new ParquetFlusher(
schema,
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getMaxRowGroups(),
clientBufferParameters.getBdecParquetCompression());
}
}
3 changes: 3 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public class Constants {
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public static final int PARQUET_MAJOR_VERSION = 1;
public static final int PARQUET_MINOR_VERSION = 0;

public enum WriteMode {
CLOUD_STORAGE,
REST_API,
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
Expand Down Expand Up @@ -39,6 +40,10 @@
public class BdecParquetWriter implements AutoCloseable {
private final InternalParquetRecordWriter<List<Object>> writer;
private final CodecFactory codecFactory;

// Optional cap on the max number of row groups to allow per file, if this is exceeded we'll end
// up throwing
private final Optional<Integer> maxRowGroups;
private long rowsWritten = 0;

/**
Expand All @@ -48,6 +53,8 @@ public class BdecParquetWriter implements AutoCloseable {
* @param schema row schema
* @param extraMetaData extra metadata
* @param channelName name of the channel that is using the writer
* @param maxRowGroups Optional cap on the max number of row groups to allow per file, if this is
* exceeded we'll end up throwing
* @throws IOException
*/
public BdecParquetWriter(
Expand All @@ -56,9 +63,11 @@ public BdecParquetWriter(
Map<String, String> extraMetaData,
String channelName,
long maxChunkSizeInBytes,
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression)
throws IOException {
OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes);
this.maxRowGroups = maxRowGroups;
ParquetProperties encodingProps = createParquetProperties();
Configuration conf = new Configuration();
WriteSupport<List<Object>> writeSupport =
Expand Down Expand Up @@ -107,6 +116,14 @@ public BdecParquetWriter(

/** @return List of row counts per block stored in the parquet footer */
public List<Long> getRowCountsFromFooter() {
if (maxRowGroups.isPresent() && writer.getFooter().getBlocks().size() > maxRowGroups.get()) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"Expecting only %d row group in the parquet file, but found %d",
maxRowGroups.get(), writer.getFooter().getBlocks().size()));
}

final List<Long> blockRowCounts = new ArrayList<>();
for (BlockMetaData metadata : writer.getFooter().getBlocks()) {
blockRowCounts.add(metadata.getRowCount());
Expand Down
Loading

0 comments on commit 3f9b959

Please sign in to comment.