Skip to content

Commit

Permalink
Address comments w/o ndv
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 4, 2024
1 parent 3ef63c0 commit 2875a66
Show file tree
Hide file tree
Showing 25 changed files with 288 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,21 +641,19 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
* @param setDefaultValues: whether to set default values for null fields the EPs
* @param setDefaultValues: whether to set default values for null fields and NDV in the EPs
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(
long rowCount, Map<String, RowBufferStats> colStats, boolean setDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
boolean enableDistinctValues = false;
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>(), !setDefaultValues);
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
enableDistinctValues = stat.isEnableDistinctValue();
FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues);
String colName = colStat.getValue().getColumnDisplayName();
epInfo.getColumnEps().put(colName, dto);
}
epInfo.verifyEpInfo(enableDistinctValues);
epInfo.verifyEpInfo();
return epInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.Utils;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.hadoop.ParquetFileWriter;

/**
* Build a single blob file that contains file header plus data. The header will be a
Expand Down Expand Up @@ -98,13 +99,6 @@ static <T> Blob constructBlobAndMetadata(
byte[] paddedChunkData = paddedChunk.getFirst();
chunkLength = paddedChunk.getSecond();

if (internalParameterProvider.getComputeExtendedMetadataSize()) {
extendedMetadataSize =
Utils.getLittleEndianInt(
paddedChunkData,
chunkLength - Constants.PARQUET_MAGIC_BYTES_LENGTH - Integer.BYTES);
}

// Encrypt the compressed chunk data, the encryption key is derived using the key from
// server with the full blob path.
// We need to maintain IV as a block counter for the whole file, even interleaved,
Expand All @@ -120,11 +114,8 @@ static <T> Blob constructBlobAndMetadata(
chunkLength = compressedChunkData.length;
compressedChunkDataSize = chunkLength;

if (internalParameterProvider.getComputeExtendedMetadataSize()) {
extendedMetadataSize =
Utils.getLittleEndianInt(
compressedChunkData,
chunkLength - Constants.PARQUET_MAGIC_BYTES_LENGTH - Integer.BYTES);
if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength);
}
}

Expand Down Expand Up @@ -156,7 +147,7 @@ static <T> Blob constructBlobAndMetadata(

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/**
* Channel immutable identification and encryption attributes.
Expand All @@ -29,14 +30,17 @@ class ChannelFlushContext {
// Data encryption key id
private final Long encryptionKeyId;

private final ParquetProperties.WriterVersion parquetWriterVersion;

ChannelFlushContext(
String name,
String dbName,
String schemaName,
String tableName,
Long channelSequencer,
String encryptionKey,
Long encryptionKeyId) {
Long encryptionKeyId,
ParquetProperties.WriterVersion parquetWriterVersion) {
this.name = name;
this.fullyQualifiedName =
Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
Expand All @@ -47,6 +51,7 @@ class ChannelFlushContext {
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
this.parquetWriterVersion = parquetWriterVersion;
}

@Override
Expand Down Expand Up @@ -115,4 +120,8 @@ String getEncryptionKey() {
Long getEncryptionKeyId() {
return encryptionKeyId;
}

ParquetProperties.WriterVersion getParquetWriterVersion() {
return parquetWriterVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Optional;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import org.apache.parquet.column.ParquetProperties;

/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
Expand All @@ -24,10 +23,6 @@ public class ClientBufferParameters {

private final Optional<Integer> maxRowGroups;

private ParquetProperties.WriterVersion parquetWriterVersion;

private boolean enableDictionaryEncoding;

private boolean isIcebergMode;

/**
Expand All @@ -43,16 +38,12 @@ private ClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding,
boolean isIcebergMode) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.maxRowGroups = maxRowGroups;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
this.isIcebergMode = isIcebergMode;
}

Expand All @@ -74,14 +65,6 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
this.parquetWriterVersion =
clientInternal != null
? clientInternal.getParameterProvider().getParquetWriterVersion()
: ParameterProvider.PARQUET_WRITER_VERSION_DEFAULT;
this.enableDictionaryEncoding =
clientInternal != null
? clientInternal.getParameterProvider().isEnableParquetDictionaryEncoding()
: ParameterProvider.ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT;
this.maxRowGroups =
isIcebergMode
? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT)
Expand All @@ -104,17 +87,13 @@ public static ClientBufferParameters test_createClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding,
boolean isIcebergMode) {
return new ClientBufferParameters(
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic,
maxRowGroups,
parquetWriterVersion,
enableDictionaryEncoding,
isIcebergMode);
}

Expand Down Expand Up @@ -146,11 +125,7 @@ public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}

public ParquetProperties.WriterVersion getParquetWriterVersion() {
return parquetWriterVersion;
}

public boolean isEnableDictionaryEncoding() {
return enableDictionaryEncoding;
public boolean isEnableDistinctValues() {
return isIcebergMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ class EpInfo {

private Map<String, FileColumnProperties> columnEps;

private boolean enableDistinctValues;

/** Default constructor, needed for Jackson */
EpInfo() {}

EpInfo(long rowCount, Map<String, FileColumnProperties> columnEps) {
EpInfo(long rowCount, Map<String, FileColumnProperties> columnEps, boolean enableDistinctValues) {
this.rowCount = rowCount;
this.columnEps = columnEps;
this.enableDistinctValues = enableDistinctValues;
}

/** Some basic verification logic to make sure the EP info is correct */
public void verifyEpInfo(boolean enableDistinctValues) {
public void verifyEpInfo() {
for (Map.Entry<String, FileColumnProperties> entry : columnEps.entrySet()) {
String colName = entry.getKey();
FileColumnProperties colEp = entry.getValue();
Expand All @@ -35,7 +38,7 @@ public void verifyEpInfo(boolean enableDistinctValues) {
colName, colEp.getNullCount(), rowCount));
}

// Make sure the NDV should always be -1 when the NDV is not enabled
// Make sure the NDV should always be -1 when the NDV set to default
if (!enableDistinctValues && colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,4 @@ boolean setIcebergSpecificFieldsInEp() {
// in the EP metadata, createdOn, and extendedMetadataSize.
return isIcebergMode;
}

boolean getComputeExtendedMetadataSize() {
// When in Iceberg mode, extendedMetadataSize is computed. Otherwise, it is -1.
return isIcebergMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import net.snowflake.ingest.utils.Constants;

/** Response to the OpenChannelRequest */
class OpenChannelResponse extends StreamingIngestResponse {
Expand All @@ -22,6 +23,7 @@ class OpenChannelResponse extends StreamingIngestResponse {
private String encryptionKey;
private Long encryptionKeyId;
private FileLocationInfo icebergLocationInfo;
private String icebergSerializationPolicy;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
Expand Down Expand Up @@ -140,4 +142,13 @@ void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) {
FileLocationInfo getIcebergLocationInfo() {
return this.icebergLocationInfo;
}

@JsonProperty("iceberg_serialization_policy")
void setIcebergSerializationPolicy(String icebergSerializationPolicy) {
this.icebergSerializationPolicy = icebergSerializationPolicy;
}

Constants.IcebergSerializationPolicy getIcebergSerializationPolicy() {
return Constants.IcebergSerializationPolicy.fromName(this.icebergSerializationPolicy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final Optional<Integer> maxRowGroups;

private final Constants.BdecParquetCompression bdecParquetCompression;
private final ParquetProperties.WriterVersion parquetWriterVersion;
private final boolean enableDictionaryEncoding;

/** Construct parquet flusher from its schema. */
Expand All @@ -40,13 +39,11 @@ public ParquetFlusher(
long maxChunkSizeInBytes,
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
}

Expand All @@ -69,6 +66,7 @@ private SerializationResult serializeFromJavaObjects(
BdecParquetWriter parquetWriter;
ByteArrayOutputStream mergedData = new ByteArrayOutputStream();
Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;
ParquetProperties.WriterVersion parquetWriterVersion = null;

for (ChannelData<ParquetChunkData> data : channelsDataPerTable) {
// Create channel metadata
Expand Down Expand Up @@ -110,6 +108,15 @@ private SerializationResult serializeFromJavaObjects(
chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
}

// Check if all the channels have the same parquet writer version
if (parquetWriterVersion == null) {
parquetWriterVersion = data.getChannelContext().getParquetWriterVersion();
} else if (!parquetWriterVersion.equals(data.getChannelContext().getParquetWriterVersion())) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Parquet writer version and storage serialization policy mismatch within a chunk");
}

rows.addAll(data.getVectors().rows);

rowCount += data.getRowCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
column.getCollation(),
column.getOrdinal(),
null /* fieldId */,
clientBufferParameters.getIsIcebergMode()));
clientBufferParameters.isEnableDistinctValues()));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
Expand All @@ -121,7 +121,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
column.getCollation(),
column.getOrdinal(),
null /* fieldId */,
clientBufferParameters.getIsIcebergMode()));
clientBufferParameters.isEnableDistinctValues()));
}
}

Expand Down Expand Up @@ -195,7 +195,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
null /* collationDefinitionString */,
ordinal,
fieldId,
clientBufferParameters.getIsIcebergMode()));
clientBufferParameters.isEnableDistinctValues()));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
Expand All @@ -206,7 +206,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
null /* collationDefinitionString */,
ordinal,
fieldId,
clientBufferParameters.getIsIcebergMode()));
clientBufferParameters.isEnableDistinctValues()));
}
}
}
Expand Down Expand Up @@ -399,7 +399,6 @@ public Flusher<ParquetChunkData> createFlusher() {
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getMaxRowGroups(),
clientBufferParameters.getBdecParquetCompression(),
clientBufferParameters.getParquetWriterVersion(),
clientBufferParameters.getIsIcebergMode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class RowBufferStats {
reset();
}

RowBufferStats(String columnDisplayName) {
this(columnDisplayName, null, -1, null, false);
RowBufferStats(String columnDisplayName, boolean enableDistinctValues) {
this(columnDisplayName, null, -1, null, enableDistinctValues);
}

void reset() {
Expand Down
Loading

0 comments on commit 2875a66

Please sign in to comment.