Skip to content

Commit

Permalink
SNOW-1727532 Set number of values for repeated fields (#861)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang authored Oct 18, 2024
1 parent 6d6ba94 commit ee48554
Show file tree
Hide file tree
Showing 34 changed files with 1,237 additions and 267 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -292,6 +292,9 @@ public InsertValidationResponse insertRows(
// Temp stats map to use until all the rows are validated
@VisibleForTesting Map<String, RowBufferStats> tempStatsMap;

// Map of the column name to the column object, used for null/missing column check
protected final Map<String, ParquetColumn> fieldIndex;

// Lock used to protect the buffers from concurrent read/write
private final Lock flushLock;

Expand Down Expand Up @@ -352,6 +355,8 @@ public InsertValidationResponse insertRows(
// Initialize empty stats
this.statsMap = new HashMap<>();
this.tempStatsMap = new HashMap<>();

this.fieldIndex = new HashMap<>();
}

/**
Expand Down Expand Up @@ -427,7 +432,7 @@ Set<String> verifyInputColumns(
List<String> missingCols = new ArrayList<>();
for (String columnName : this.nonNullableFieldNames) {
if (!inputColNamesMap.containsKey(columnName)) {
missingCols.add(statsMap.get(columnName).getColumnDisplayName());
missingCols.add(fieldIndex.get(columnName).columnMetadata.getName());
}
}

Expand All @@ -447,7 +452,7 @@ Set<String> verifyInputColumns(
for (String columnName : this.nonNullableFieldNames) {
if (inputColNamesMap.containsKey(columnName)
&& row.get(inputColNamesMap.get(columnName)) == null) {
nullValueNotNullCols.add(statsMap.get(columnName).getColumnDisplayName());
nullValueNotNullCols.add(fieldIndex.get(columnName).columnMetadata.getName());
}
}

Expand Down Expand Up @@ -642,13 +647,17 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
* @param setAllDefaultValues: whether to set default values for all null fields the EPs
* irrespective of the data type of this column
* @param setAllDefaultValues: whether to set default values for all null min/max field in the EPs
* @param enableDistinctValuesCount: whether to include valid NDV in the EPs irrespective of the
* data type of this column
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(
long rowCount, Map<String, RowBufferStats> colStats, boolean setAllDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
long rowCount,
Map<String, RowBufferStats> colStats,
boolean setAllDefaultValues,
boolean enableDistinctValuesCount) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>(), enableDistinctValuesCount);
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
FileColumnProperties dto = new FileColumnProperties(stat, setAllDefaultValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER;
import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Utils.getParquetFooterSize;
import static net.snowflake.ingest.utils.Utils.toByteArray;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -29,10 +30,11 @@
import javax.crypto.NoSuchPaddingException;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Cryptor;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.hadoop.ParquetFileWriter;

/**
* Build a single blob file that contains file header plus data. The header will be a
Expand Down Expand Up @@ -135,17 +137,27 @@ static <T> Blob constructBlobAndMetadata(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount,
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setAllDefaultValuesInEp()))
internalParameterProvider.setAllDefaultValuesInEp(),
internalParameterProvider.isEnableDistinctValuesCount()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
if (internalParameterProvider.getEnableChunkEncryption()) {
/* metadata size computation only works when encryption and padding is off */
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Metadata size computation is only supported when encryption is enabled");
}
final long metadataSize = getParquetFooterSize(compressedChunkData);
final long extendedMetadataSize = serializedChunk.extendedMetadataSize;
chunkMetadataBuilder
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION)
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
.setExtendedMetadataSize(-1L);
.setMetadataSize(metadataSize)
.setExtendedMetadataSize(extendedMetadataSize);
}

ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -26,6 +26,7 @@ class ChunkMetadata {
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long metadataSize;
private Long extendedMetadataSize;

static Builder builder() {
Expand All @@ -51,6 +52,7 @@ static class Builder {
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long metadataSize;
private Long extendedMetadataSize;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
Expand Down Expand Up @@ -124,6 +126,11 @@ Builder setCreatedOn(Long createdOn) {
return this;
}

Builder setMetadataSize(Long metadataSize) {
this.metadataSize = metadataSize;
return this;
}

Builder setExtendedMetadataSize(Long extendedMetadataSize) {
this.extendedMetadataSize = extendedMetadataSize;
return this;
Expand Down Expand Up @@ -165,6 +172,7 @@ private ChunkMetadata(Builder builder) {
this.majorVersion = builder.majorVersion;
this.minorVersion = builder.minorVersion;
this.createdOn = builder.createdOn;
this.metadataSize = builder.metadataSize;
this.extendedMetadataSize = builder.extendedMetadataSize;
}

Expand Down Expand Up @@ -258,6 +266,12 @@ Long getCreatedOn() {
return this.createdOn;
}

@JsonProperty("metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getMetadataSize() {
return this.metadataSize;
}

@JsonProperty("ext_metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getExtendedMetadataSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class ClientBufferParameters {

private boolean isIcebergMode;

private boolean enableDistinctValuesCount;

private boolean enableValuesCount;

/**
* Private constructor used for test methods
*
Expand All @@ -38,13 +42,17 @@ private ClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode) {
boolean isIcebergMode,
boolean enableDistinctValuesCount,
boolean enableValuesCount) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.maxRowGroups = maxRowGroups;
this.isIcebergMode = isIcebergMode;
this.enableDistinctValuesCount = enableDistinctValuesCount;
this.enableValuesCount = enableValuesCount;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand Down Expand Up @@ -73,6 +81,14 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
isIcebergMode
? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT)
: Optional.empty();
this.enableDistinctValuesCount =
clientInternal != null
? clientInternal.getInternalParameterProvider().isEnableDistinctValuesCount()
: InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT;
this.enableValuesCount =
clientInternal != null
? clientInternal.getInternalParameterProvider().isEnableValuesCount()
: InternalParameterProvider.ENABLE_VALUES_COUNT_DEFAULT;
}

/**
Expand All @@ -87,14 +103,18 @@ public static ClientBufferParameters test_createClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode) {
boolean isIcebergMode,
boolean enableDistinctValuesCount,
boolean enableValuesCount) {
return new ClientBufferParameters(
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic,
maxRowGroups,
isIcebergMode);
isIcebergMode,
enableDistinctValuesCount,
enableValuesCount);
}

public long getMaxChunkSizeInBytes() {
Expand Down Expand Up @@ -125,6 +145,14 @@ public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}

public boolean isEnableDistinctValuesCount() {
return enableDistinctValuesCount;
}

public boolean isEnableValuesCount() {
return enableValuesCount;
}

public boolean isEnableDictionaryEncoding() {
return isIcebergMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ class EpInfo {

private Map<String, FileColumnProperties> columnEps;

private boolean enableDistinctValuesCount;

/** Default constructor, needed for Jackson */
EpInfo() {}

EpInfo(long rowCount, Map<String, FileColumnProperties> columnEps) {
EpInfo(
long rowCount,
Map<String, FileColumnProperties> columnEps,
boolean enableDistinctValuesCount) {
this.rowCount = rowCount;
this.columnEps = columnEps;
this.enableDistinctValuesCount = enableDistinctValuesCount;
}

/** Some basic verification logic to make sure the EP info is correct */
Expand All @@ -35,8 +41,8 @@ public void verifyEpInfo() {
colName, colEp.getNullCount(), rowCount));
}

// Make sure the NDV should always be -1
if (colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
// Make sure the NDV should always be -1 when the NDV set to default
if (!enableDistinctValuesCount && colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;
import static net.snowflake.ingest.utils.Constants.EP_NV_UNKNOWN;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -44,6 +45,9 @@ class FileColumnProperties {

private long nullCount;

// for elements in repeated columns
private Long numberOfValues;

// for binary or string columns
private long maxLength;

Expand Down Expand Up @@ -110,6 +114,10 @@ class FileColumnProperties {
this.setMinStrNonCollated(null);
this.setNullCount(stats.getCurrentNullCount());
this.setDistinctValues(stats.getDistinctValues());

if (stats.getNumberOfValues() != EP_NV_UNKNOWN) {
this.setNumberOfValues(stats.getNumberOfValues());
}
}

private void setIntValues(RowBufferStats stats) {
Expand Down Expand Up @@ -284,6 +292,16 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
this.maxStrNonCollated = maxStrNonCollated;
}

@JsonProperty("numberOfValues")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getNumberOfValues() {
return numberOfValues;
}

void setNumberOfValues(Long numberOfValues) {
this.numberOfValues = numberOfValues;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
Expand All @@ -306,6 +324,7 @@ public String toString() {
}
sb.append(", \"distinctValues\": ").append(distinctValues);
sb.append(", \"nullCount\": ").append(nullCount);
sb.append(", \"numberOfValues\": ").append(numberOfValues);
return sb.append('}').toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,13 @@ BlobMetadata buildAndUpload(
InvalidKeyException {
Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency);

InternalParameterProvider paramProvider = this.owningClient.getInternalParameterProvider();
// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath.fileName, blobData, bdecVersion, paramProvider);
blobPath.fileName,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider());

blob.blobStats.setBuildDurationMs(buildContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ class SerializationResult {
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;
final long extendedMetadataSize;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
Pair<Long, Long> chunkMinMaxInsertTimeInMs,
long extendedMetadataSize) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
this.extendedMetadataSize = extendedMetadataSize;
}
}
}
Loading

0 comments on commit ee48554

Please sign in to comment.