Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1727532 Set number of values for repeated fields #861

Merged
merged 6 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -292,6 +292,9 @@ public InsertValidationResponse insertRows(
// Temp stats map to use until all the rows are validated
@VisibleForTesting Map<String, RowBufferStats> tempStatsMap;

// Map of the column name to the column object, used for null/missing column check
protected final Map<String, ParquetColumn> fieldIndex;

// Lock used to protect the buffers from concurrent read/write
private final Lock flushLock;

Expand Down Expand Up @@ -352,6 +355,8 @@ public InsertValidationResponse insertRows(
// Initialize empty stats
this.statsMap = new HashMap<>();
this.tempStatsMap = new HashMap<>();

this.fieldIndex = new HashMap<>();
}

/**
Expand Down Expand Up @@ -427,7 +432,7 @@ Set<String> verifyInputColumns(
List<String> missingCols = new ArrayList<>();
for (String columnName : this.nonNullableFieldNames) {
if (!inputColNamesMap.containsKey(columnName)) {
missingCols.add(statsMap.get(columnName).getColumnDisplayName());
missingCols.add(fieldIndex.get(columnName).columnMetadata.getName());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the getColumnDisplayName() method from the RowBufferStats class now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, is there a need to even make this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a root column is a structured data type, statsMap.get(columnName) will trow NPE as we only stores leaf column stats. Use fieldIndex instead for logging to avoid this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add this as a code comment.

}
}

Expand All @@ -447,7 +452,7 @@ Set<String> verifyInputColumns(
for (String columnName : this.nonNullableFieldNames) {
if (inputColNamesMap.containsKey(columnName)
&& row.get(inputColNamesMap.get(columnName)) == null) {
nullValueNotNullCols.add(statsMap.get(columnName).getColumnDisplayName());
nullValueNotNullCols.add(fieldIndex.get(columnName).columnMetadata.getName());
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -642,13 +647,17 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
* @param setAllDefaultValues: whether to set default values for all null fields the EPs
* irrespective of the data type of this column
* @param setAllDefaultValues: whether to set default values for all null min/max field in the EPs
* @param enableDistinctValuesCount: whether to include valid NDV in the EPs irrespective of the
* data type of this column
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(
long rowCount, Map<String, RowBufferStats> colStats, boolean setAllDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
long rowCount,
Map<String, RowBufferStats> colStats,
boolean setAllDefaultValues,
boolean enableDistinctValuesCount) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>(), enableDistinctValuesCount);
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
FileColumnProperties dto = new FileColumnProperties(stat, setAllDefaultValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER;
import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Utils.getParquetFooterSize;
import static net.snowflake.ingest.utils.Utils.toByteArray;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -29,10 +30,11 @@
import javax.crypto.NoSuchPaddingException;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Cryptor;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.hadoop.ParquetFileWriter;

/**
* Build a single blob file that contains file header plus data. The header will be a
Expand Down Expand Up @@ -135,17 +137,27 @@ static <T> Blob constructBlobAndMetadata(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount,
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setAllDefaultValuesInEp()))
internalParameterProvider.setAllDefaultValuesInEp(),
internalParameterProvider.isEnableDistinctValuesCount()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
if (internalParameterProvider.getEnableChunkEncryption()) {
/* metadata size computation only works when encryption and padding is off */
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Metadata size computation is only supported when encryption is enabled");
}
final long metadataSize = getParquetFooterSize(compressedChunkData);
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
final long extendedMetadataSize = serializedChunk.extendedMetadataSize;
chunkMetadataBuilder
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION)
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
.setExtendedMetadataSize(-1L);
.setMetadataSize(metadataSize)
.setExtendedMetadataSize(extendedMetadataSize);
}

ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -26,6 +26,7 @@ class ChunkMetadata {
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long metadataSize;
private Long extendedMetadataSize;

static Builder builder() {
Expand All @@ -51,6 +52,7 @@ static class Builder {
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long metadataSize;
private Long extendedMetadataSize;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
Expand Down Expand Up @@ -124,6 +126,11 @@ Builder setCreatedOn(Long createdOn) {
return this;
}

Builder setMetadataSize(Long metadataSize) {
this.metadataSize = metadataSize;
return this;
}

Builder setExtendedMetadataSize(Long extendedMetadataSize) {
this.extendedMetadataSize = extendedMetadataSize;
return this;
Expand Down Expand Up @@ -165,6 +172,7 @@ private ChunkMetadata(Builder builder) {
this.majorVersion = builder.majorVersion;
this.minorVersion = builder.minorVersion;
this.createdOn = builder.createdOn;
this.metadataSize = builder.metadataSize;
this.extendedMetadataSize = builder.extendedMetadataSize;
}

Expand Down Expand Up @@ -258,6 +266,12 @@ Long getCreatedOn() {
return this.createdOn;
}

@JsonProperty("metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getMetadataSize() {
return this.metadataSize;
}

@JsonProperty("ext_metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getExtendedMetadataSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class ClientBufferParameters {

private boolean isIcebergMode;

private boolean enableDistinctValuesCount;

private boolean enableValuesCount;

/**
* Private constructor used for test methods
*
Expand All @@ -38,13 +42,17 @@ private ClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode) {
boolean isIcebergMode,
boolean enableDistinctValuesCount,
boolean enableValuesCount) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.maxRowGroups = maxRowGroups;
this.isIcebergMode = isIcebergMode;
this.enableDistinctValuesCount = enableDistinctValuesCount;
this.enableValuesCount = enableValuesCount;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never understood why clientInternal can or should be allowed to be null. Looks like a testcase author leaking their convenience needs into production code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clientInterTest need this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's another ctor for testcases to pass in whatever booleans they want, so still unclear why there's a need to pass in a null clientInternal to this ctor.

Expand Down Expand Up @@ -73,6 +81,14 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
isIcebergMode
? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT)
: Optional.empty();
this.enableDistinctValuesCount =
clientInternal != null
? clientInternal.getInternalParameterProvider().isEnableDistinctValuesCount()
: InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT;
this.enableValuesCount =
clientInternal != null
? clientInternal.getInternalParameterProvider().isEnableValuesCount()
: InternalParameterProvider.ENABLE_VALUES_COUNT_DEFAULT;
}

/**
Expand All @@ -87,14 +103,18 @@ public static ClientBufferParameters test_createClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode) {
boolean isIcebergMode,
boolean enableDistinctValuesCount,
boolean enableValuesCount) {
return new ClientBufferParameters(
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic,
maxRowGroups,
isIcebergMode);
isIcebergMode,
enableDistinctValuesCount,
enableValuesCount);
}

public long getMaxChunkSizeInBytes() {
Expand Down Expand Up @@ -125,6 +145,14 @@ public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}

public boolean isEnableDistinctValuesCount() {
return enableDistinctValuesCount;
}

public boolean isEnableValuesCount() {
return enableValuesCount;
}

public boolean isEnableDictionaryEncoding() {
return isIcebergMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ class EpInfo {

private Map<String, FileColumnProperties> columnEps;

private boolean enableDistinctValuesCount;

/** Default constructor, needed for Jackson */
EpInfo() {}

EpInfo(long rowCount, Map<String, FileColumnProperties> columnEps) {
EpInfo(
long rowCount,
Map<String, FileColumnProperties> columnEps,
boolean enableDistinctValuesCount) {
this.rowCount = rowCount;
this.columnEps = columnEps;
this.enableDistinctValuesCount = enableDistinctValuesCount;
}

/** Some basic verification logic to make sure the EP info is correct */
Expand All @@ -35,8 +41,8 @@ public void verifyEpInfo() {
colName, colEp.getNullCount(), rowCount));
}

// Make sure the NDV should always be -1
if (colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
// Make sure the NDV should always be -1 when the NDV set to default
if (!enableDistinctValuesCount && colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;
import static net.snowflake.ingest.utils.Constants.EP_NV_UNKNOWN;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -44,6 +45,9 @@ class FileColumnProperties {

private long nullCount;

// for elements in repeated columns
private Long numberOfValues;

// for binary or string columns
private long maxLength;

Expand Down Expand Up @@ -110,6 +114,10 @@ class FileColumnProperties {
this.setMinStrNonCollated(null);
this.setNullCount(stats.getCurrentNullCount());
this.setDistinctValues(stats.getDistinctValues());
sfc-gh-hmadan marked this conversation as resolved.
Show resolved Hide resolved

if (stats.getNumberOfValues() != EP_NV_UNKNOWN) {
this.setNumberOfValues(stats.getNumberOfValues());
}
}

private void setIntValues(RowBufferStats stats) {
Expand Down Expand Up @@ -284,6 +292,16 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
this.maxStrNonCollated = maxStrNonCollated;
}

@JsonProperty("numberOfValues")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getNumberOfValues() {
return numberOfValues;
}

void setNumberOfValues(Long numberOfValues) {
this.numberOfValues = numberOfValues;
}

@Override
public String toString() {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
final StringBuilder sb = new StringBuilder("{");
Expand All @@ -306,6 +324,7 @@ public String toString() {
}
sb.append(", \"distinctValues\": ").append(distinctValues);
sb.append(", \"nullCount\": ").append(nullCount);
sb.append(", \"numberOfValues\": ").append(numberOfValues);
return sb.append('}').toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,13 @@ BlobMetadata buildAndUpload(
InvalidKeyException {
Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency);

InternalParameterProvider paramProvider = this.owningClient.getInternalParameterProvider();
// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath.fileName, blobData, bdecVersion, paramProvider);
blobPath.fileName,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider());

blob.blobStats.setBuildDurationMs(buildContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ class SerializationResult {
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;
final long extendedMetadataSize;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
Pair<Long, Long> chunkMinMaxInsertTimeInMs,
long extendedMetadataSize) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
this.extendedMetadataSize = extendedMetadataSize;
}
}
}
Loading
Loading