Skip to content

Commit

Permalink
Integration data type test for new table format (#850)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Hitesh Madan <[email protected]>
  • Loading branch information
sfc-gh-alhuang and sfc-gh-hmadan authored Oct 9, 2024
1 parent 3c94b8c commit a6339aa
Show file tree
Hide file tree
Showing 27 changed files with 1,649 additions and 93 deletions.
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.26.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -634,6 +640,11 @@
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,15 +641,16 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
* @param setDefaultValues: whether to set default values for null fields the EPs
* @param setAllDefaultValues: whether to set default values for all null fields the EPs
* irrespective of the data type of this column
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(
long rowCount, Map<String, RowBufferStats> colStats, boolean setDefaultValues) {
long rowCount, Map<String, RowBufferStats> colStats, boolean setAllDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues);
FileColumnProperties dto = new FileColumnProperties(stat, setAllDefaultValues);
String colName = colStat.getValue().getColumnDisplayName();
epInfo.getColumnEps().put(colName, dto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ static <T> Blob constructBlobAndMetadata(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount,
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setDefaultValuesInEp()))
internalParameterProvider.setAllDefaultValuesInEp()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ static void checkValueInRange(
static void checkFixedLengthByteArray(byte[] bytes, int length, final long insertRowIndex) {
if (bytes.length != length) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Binary length mismatch: expected=%d, actual=%d, rowIndex:%d",
length, bytes.length, insertRowIndex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
Expand Down Expand Up @@ -50,45 +53,107 @@ class FileColumnProperties {
// Default value to use for min/max real when all data in the given column is NULL
public static final Double DEFAULT_MIN_MAX_REAL_VAL_FOR_EP = 0d;

FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
// Default value to use for min/max string when all data in the given Iceberg column is NULL
public static final String DEFAULT_MIN_MAX_STR_VAL_FOR_EP = "";

/**
* @param setAllDefaultValues Whether to set defaults for ALL fields, or only some. BDEC sets it
* for all but iceberg does not.
*/
FileColumnProperties(RowBufferStats stats, boolean setAllDefaultValues) {
this.setColumnOrdinal(stats.getOrdinal());
this.setFieldId(stats.getFieldId());
this.setCollation(stats.getCollationDefinitionString());

if (setAllDefaultValues) {
/* Set every column to default value for FDN columns if the all row values are null */
setIntValues(stats);
setRealValues(stats);
setStringValues(stats, false /* replaceNullWithEmptyString */);
} else {
/* Only set corresponding min/max stats to default value for Iceberg columns if the all row values are null */
switch (stats.getPrimitiveType().getPrimitiveTypeName()) {
case BOOLEAN:
case INT32:
case INT64:
setIntValues(stats);
break;

case FLOAT:
case DOUBLE:
setRealValues(stats);
break;

case BINARY:
setStringValues(stats, true /* replaceNullWithEmptyString */);
break;

case FIXED_LEN_BYTE_ARRAY:
if (stats.getPrimitiveType().getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
setIntValues(stats);
} else {
setStringValues(stats, true /* replaceNullWithEmptyString */);
}
break;

default:
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Unsupported Iceberg column type: "
+ stats.getPrimitiveType().getPrimitiveTypeName());
}
}

this.setMaxLength(stats.getCurrentMaxLength());
this.setMaxStrNonCollated(null);
this.setMinStrNonCollated(null);
this.setNullCount(stats.getCurrentNullCount());
this.setDistinctValues(stats.getDistinctValues());
}

private void setIntValues(RowBufferStats stats) {
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMaxIntValue());

this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMinIntValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMinRealValue());
}

private void setRealValues(RowBufferStats stats) {
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMaxRealValue());
this.setMaxLength(stats.getCurrentMaxLength());

this.setMaxStrNonCollated(null);
this.setMinStrNonCollated(null);

// current hex-encoded min value, truncated down to 32 bytes
if (stats.getCurrentMinStrValue() != null) {
String truncatedAsHex = truncateBytesAsHex(stats.getCurrentMinStrValue(), false);
this.setMinStrValue(truncatedAsHex);
}
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMinRealValue());
}

private void setStringValues(RowBufferStats stats, boolean replaceNullWithEmptyString) {
// current hex-encoded max value, truncated up to 32 bytes
if (stats.getCurrentMaxStrValue() != null) {
String truncatedAsHex = truncateBytesAsHex(stats.getCurrentMaxStrValue(), true);
String truncatedAsHex =
truncateBytesAsHex(stats.getCurrentMaxStrValue(), true /* truncateUp */);
this.setMaxStrValue(truncatedAsHex);
} else if (replaceNullWithEmptyString) {
this.setMaxStrValue(DEFAULT_MIN_MAX_STR_VAL_FOR_EP);
}

this.setNullCount(stats.getCurrentNullCount());
this.setDistinctValues(stats.getDistinctValues());
// current hex-encoded min value, truncated down to 32 bytes
if (stats.getCurrentMinStrValue() != null) {
String truncatedAsHex =
truncateBytesAsHex(stats.getCurrentMinStrValue(), false /* truncateUp */);
this.setMinStrValue(truncatedAsHex);
} else if (replaceNullWithEmptyString) {
this.setMinStrValue(DEFAULT_MIN_MAX_STR_VAL_FOR_EP);
}
}

@JsonProperty("columnId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ boolean getEnableChunkEncryption() {
return !isIcebergMode;
}

boolean setDefaultValuesInEp() {
// When in Iceberg mode, we need to populate nulls (instead of zeroes) in the minIntValue /
// maxIntValue / minRealValue / maxRealValue fields of the EP Metadata.
boolean setAllDefaultValuesInEp() {
// When in non-iceberg mode, we want to default the stats for all data types (int/real/string)
// to 0 / to "".
// However when in iceberg mode, we want to default only those stats that are
// relevant to the current datatype.
return !isIcebergMode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
Expand Down Expand Up @@ -88,21 +89,26 @@ public void setupSchema(List<ColumnMetadata> columns) {
/* Set up fields using top level column information */
validateColumnCollation(column);
ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id);
parquetTypes.add(typeInfo.getParquetType());
Type parquetType = typeInfo.getParquetType();
parquetTypes.add(parquetType);
this.metadata.putAll(typeInfo.getMetadata());
int columnIndex = parquetTypes.size() - 1;
fieldIndex.put(
column.getInternalName(),
new ParquetColumn(column, columnIndex, typeInfo.getParquetType()));
fieldIndex.put(column.getInternalName(), new ParquetColumn(column, columnIndex, parquetType));

if (!column.getNullable()) {
addNonNullableFieldName(column.getInternalName());
}

if (!clientBufferParameters.getIsIcebergMode()) {
/* Streaming to FDN table doesn't support sub-columns, set up the stats here. */
this.statsMap.put(
column.getInternalName(),
new RowBufferStats(
column.getName(), column.getCollation(), column.getOrdinal(), null /* fieldId */));
column.getName(),
column.getCollation(),
column.getOrdinal(),
null /* fieldId */,
parquetType.isPrimitive() ? parquetType.asPrimitiveType() : null));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
Expand All @@ -116,7 +122,8 @@ public void setupSchema(List<ColumnMetadata> columns) {
column.getName(),
column.getCollation(),
column.getOrdinal(),
null /* fieldId */));
null /* fieldId */,
parquetType.isPrimitive() ? parquetType.asPrimitiveType() : null));
}
}

Expand Down Expand Up @@ -175,24 +182,27 @@ public void setupSchema(List<ColumnMetadata> columns) {
if (clientBufferParameters.getIsIcebergMode()) {
for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
String columnPath = concatDotPath(columnDescriptor.getPath());
PrimitiveType primitiveType = columnDescriptor.getPrimitiveType();

/* set fieldId to 0 for non-structured columns */
int fieldId =
columnDescriptor.getPath().length == 1
? 0
: columnDescriptor.getPrimitiveType().getId().intValue();
int fieldId = columnDescriptor.getPath().length == 1 ? 0 : primitiveType.getId().intValue();
int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue();

this.statsMap.put(
columnPath,
new RowBufferStats(columnPath, null /* collationDefinitionString */, ordinal, fieldId));
new RowBufferStats(
columnPath, null /* collationDefinitionString */, ordinal, fieldId, primitiveType));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
columnPath,
new RowBufferStats(
columnPath, null /* collationDefinitionString */, ordinal, fieldId));
columnPath,
null /* collationDefinitionString */,
ordinal,
fieldId,
primitiveType));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Objects;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.schema.PrimitiveType;

/** Keeps track of the active EP stats, used to generate a file EP info */
class RowBufferStats {
Expand All @@ -25,6 +26,12 @@ class RowBufferStats {
*/
private final Integer fieldId;

private final String collationDefinitionString;
/** Display name is required for the registration endpoint */
private final String columnDisplayName;
/** Primitive type of the column, only used for Iceberg columns */
private final PrimitiveType primitiveType;

private byte[] currentMinStrValue;
private byte[] currentMaxStrValue;
private BigInteger currentMinIntValue;
Expand All @@ -34,22 +41,27 @@ class RowBufferStats {
private long currentNullCount;
// for binary or string columns
private long currentMaxLength;
private final String collationDefinitionString;
/** Display name is required for the registration endpoint */
private final String columnDisplayName;

/** Creates empty stats */
RowBufferStats(
String columnDisplayName, String collationDefinitionString, int ordinal, Integer fieldId) {
String columnDisplayName,
String collationDefinitionString,
int ordinal,
Integer fieldId,
PrimitiveType primitiveType) {
this.columnDisplayName = columnDisplayName;
this.collationDefinitionString = collationDefinitionString;
this.ordinal = ordinal;
this.fieldId = fieldId;
this.primitiveType = primitiveType;
reset();
}

RowBufferStats(String columnDisplayName) {
this(columnDisplayName, null, -1, null);
this(columnDisplayName, null, -1, null, null);
}

RowBufferStats(String columnDisplayName, PrimitiveType primitiveType) {
this(columnDisplayName, null, -1, null, primitiveType);
}

void reset() {
Expand All @@ -69,7 +81,8 @@ RowBufferStats forkEmpty() {
this.getColumnDisplayName(),
this.getCollationDefinitionString(),
this.getOrdinal(),
this.getFieldId());
this.getFieldId(),
this.getPrimitiveType());
}

// TODO performance test this vs in place update
Expand All @@ -87,7 +100,8 @@ static RowBufferStats getCombinedStats(RowBufferStats left, RowBufferStats right
left.columnDisplayName,
left.getCollationDefinitionString(),
left.getOrdinal(),
left.getFieldId());
left.getFieldId(),
left.getPrimitiveType());

if (left.currentMinIntValue != null) {
combined.addIntValue(left.currentMinIntValue);
Expand Down Expand Up @@ -238,6 +252,10 @@ Integer getFieldId() {
return fieldId;
}

PrimitiveType getPrimitiveType() {
return primitiveType;
}

/**
* Compares two byte arrays lexicographically. If the two arrays share a common prefix then the
* lexicographic comparison is the result of comparing two elements, as if by Byte.compare(byte,
Expand Down
Loading

0 comments on commit a6339aa

Please sign in to comment.