Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration data type test for new table format #850

Merged
merged 11 commits into from
Oct 9, 2024
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.26.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -614,6 +620,11 @@
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ static EpInfo buildEpInfoFromStats(
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues);
FileColumnProperties dto = new FileColumnProperties(stat);
String colName = colStat.getValue().getColumnDisplayName();
epInfo.getColumnEps().put(colName, dto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ static void checkValueInRange(
static void checkFixedLengthByteArray(byte[] bytes, int length, final long insertRowIndex) {
if (bytes.length != length) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Binary length mismatch: expected=%d, actual=%d, rowIndex:%d",
length, bytes.length, insertRowIndex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
Expand Down Expand Up @@ -50,43 +53,113 @@ class FileColumnProperties {
// Default value to use for min/max real when all data in the given column is NULL
public static final Double DEFAULT_MIN_MAX_REAL_VAL_FOR_EP = 0d;

FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
// Default value to use for min/max string when all data in the given Iceberg column is NULL
public static final String DEFAULT_MIN_MAX_STR_VAL_FOR_EP = "";

FileColumnProperties(RowBufferStats stats) {
this.setColumnOrdinal(stats.getOrdinal());
this.setFieldId(stats.getFieldId());
this.setCollation(stats.getCollationDefinitionString());
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
: stats.getCurrentMinIntValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMinRealValue());
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMaxRealValue());
this.setMaxLength(stats.getCurrentMaxLength());

if (stats.getIsIcebergColumn()) {
/* Only set corresponding min/max stats to default value for Iceberg columns if the all row values are null */
switch (stats.getPrimitiveType().getPrimitiveTypeName()) {
case BOOLEAN:
case INT32:
case INT64:
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMinIntValue());
break;
case FLOAT:
case DOUBLE:
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMaxRealValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMinRealValue());
break;
case BINARY:
this.setMaxStrValue(
stats.getCurrentMaxStrValue() == null
? DEFAULT_MIN_MAX_STR_VAL_FOR_EP
: truncateBytesAsHex(stats.getCurrentMaxStrValue(), false /* truncate down */));
this.setMinStrValue(
stats.getCurrentMinStrValue() == null
? DEFAULT_MIN_MAX_STR_VAL_FOR_EP
: truncateBytesAsHex(stats.getCurrentMinStrValue(), false /* truncate down */));
break;
case FIXED_LEN_BYTE_ARRAY:
if (stats.getPrimitiveType().getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMinIntValue());
} else {
this.setMaxStrValue(
stats.getCurrentMaxStrValue() == null
? DEFAULT_MIN_MAX_STR_VAL_FOR_EP
: truncateBytesAsHex(stats.getCurrentMaxStrValue(), true /* truncate down */));
this.setMinStrValue(
stats.getCurrentMinStrValue() == null
? DEFAULT_MIN_MAX_STR_VAL_FOR_EP
: truncateBytesAsHex(stats.getCurrentMinStrValue(), false /* truncate down */));
}
break;
default:
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Unsupported Iceberg column type: "
+ stats.getPrimitiveType().getPrimitiveTypeName());
}
} else {
/* Set every column to default value for FDN columns if the all row values are null */
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMinIntValue());
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMaxRealValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMinRealValue());

// current hex-encoded max value, truncated up to 32 bytes
if (stats.getCurrentMaxStrValue() != null) {
String truncatedAsHex =
truncateBytesAsHex(stats.getCurrentMaxStrValue(), true /* truncate down */);
this.setMaxStrValue(truncatedAsHex);
}
// current hex-encoded min value, truncated down to 32 bytes
if (stats.getCurrentMinStrValue() != null) {
String truncatedAsHex =
truncateBytesAsHex(stats.getCurrentMinStrValue(), false /* truncate down */);
this.setMinStrValue(truncatedAsHex);
}
}
this.setMaxLength(stats.getCurrentMaxLength());
this.setMaxStrNonCollated(null);
this.setMinStrNonCollated(null);

// current hex-encoded min value, truncated down to 32 bytes
if (stats.getCurrentMinStrValue() != null) {
String truncatedAsHex = truncateBytesAsHex(stats.getCurrentMinStrValue(), false);
this.setMinStrValue(truncatedAsHex);
}

// current hex-encoded max value, truncated up to 32 bytes
if (stats.getCurrentMaxStrValue() != null) {
String truncatedAsHex = truncateBytesAsHex(stats.getCurrentMaxStrValue(), true);
this.setMaxStrValue(truncatedAsHex);
}

this.setNullCount(stats.getCurrentNullCount());
this.setDistinctValues(stats.getDistinctValues());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
Expand Down Expand Up @@ -101,8 +102,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
/* Streaming to FDN table doesn't support sub-columns, set up the stats here. */
this.statsMap.put(
column.getInternalName(),
new RowBufferStats(
column.getName(), column.getCollation(), column.getOrdinal(), null /* fieldId */));
new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal()));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
Expand All @@ -112,11 +112,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
*/
this.tempStatsMap.put(
column.getInternalName(),
new RowBufferStats(
column.getName(),
column.getCollation(),
column.getOrdinal(),
null /* fieldId */));
new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal()));
}
}

Expand Down Expand Up @@ -175,24 +171,19 @@ public void setupSchema(List<ColumnMetadata> columns) {
if (clientBufferParameters.getIsIcebergMode()) {
for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
String columnPath = concatDotPath(columnDescriptor.getPath());
PrimitiveType primitiveType = columnDescriptor.getPrimitiveType();

/* set fieldId to 0 for non-structured columns */
int fieldId =
columnDescriptor.getPath().length == 1
? 0
: columnDescriptor.getPrimitiveType().getId().intValue();
int fieldId = columnDescriptor.getPath().length == 1 ? 0 : primitiveType.getId().intValue();
int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue();

this.statsMap.put(
columnPath,
new RowBufferStats(columnPath, null /* collationDefinitionString */, ordinal, fieldId));
columnPath, new RowBufferStats(columnPath, ordinal, fieldId, primitiveType));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
columnPath,
new RowBufferStats(
columnPath, null /* collationDefinitionString */, ordinal, fieldId));
columnPath, new RowBufferStats(columnPath, ordinal, fieldId, primitiveType));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Objects;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.schema.PrimitiveType;

/** Keeps track of the active EP stats, used to generate a file EP info */
class RowBufferStats {
Expand All @@ -37,19 +38,58 @@ class RowBufferStats {
private final String collationDefinitionString;
/** Display name is required for the registration endpoint */
private final String columnDisplayName;
/** Whether the column is an Iceberg column */
private final boolean isIcebergColumn;
/** Primitive type of the column, only used for Iceberg columns */
private final PrimitiveType primitiveType;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pl update the PR description to explain there's a bug fix in this PR too.


/** Creates empty stats */
/** Creates empty Iceberg column stats */
RowBufferStats(
String columnDisplayName, String collationDefinitionString, int ordinal, Integer fieldId) {
String columnDisplayName, int ordinal, Integer fieldId, PrimitiveType primitiveType) {
this(
columnDisplayName,
null /* collationDefinitionString */,
ordinal,
fieldId,
true /* isIcebergColumn */,
primitiveType);
}

/** Creates empty FDN column stats */
RowBufferStats(String columnDisplayName, String collationDefinitionString, int ordinal) {
this(
columnDisplayName,
collationDefinitionString,
ordinal,
null /* fieldId */,
false /* isIcebergColumn */,
null /* primitiveType */);
}

RowBufferStats(
String columnDisplayName,
String collationDefinitionString,
int ordinal,
Integer fieldId,
boolean isIcebergColumn,
PrimitiveType primitiveType) {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
this.columnDisplayName = columnDisplayName;
this.collationDefinitionString = collationDefinitionString;
this.ordinal = ordinal;
this.fieldId = fieldId;
this.isIcebergColumn = isIcebergColumn;
this.primitiveType = primitiveType;
if (isIcebergColumn && (primitiveType == null || fieldId == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Iceberg column %s must have a primitive type and field id",
columnDisplayName);
}
reset();
}

RowBufferStats(String columnDisplayName) {
this(columnDisplayName, null, -1, null);
this(columnDisplayName, null, -1, null, false, null);
}

void reset() {
Expand All @@ -69,7 +109,9 @@ RowBufferStats forkEmpty() {
this.getColumnDisplayName(),
this.getCollationDefinitionString(),
this.getOrdinal(),
this.getFieldId());
this.getFieldId(),
this.getIsIcebergColumn(),
this.getPrimitiveType());
}

// TODO performance test this vs in place update
Expand All @@ -87,7 +129,9 @@ static RowBufferStats getCombinedStats(RowBufferStats left, RowBufferStats right
left.columnDisplayName,
left.getCollationDefinitionString(),
left.getOrdinal(),
left.getFieldId());
left.getFieldId(),
left.getIsIcebergColumn(),
left.getPrimitiveType());

if (left.currentMinIntValue != null) {
combined.addIntValue(left.currentMinIntValue);
Expand Down Expand Up @@ -238,6 +282,14 @@ Integer getFieldId() {
return fieldId;
}

boolean getIsIcebergColumn() {
return isIcebergColumn;
}

PrimitiveType getPrimitiveType() {
return primitiveType;
}

/**
* Compares two byte arrays lexicographically. If the two arrays share a common prefix then the
* lexicographic comparison is the result of comparing two elements, as if by Byte.compare(byte,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -98,7 +101,18 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(int metada

channelData
.getColumnEps()
.putIfAbsent(columnName, new RowBufferStats(columnName, null, 1, isIceberg ? 0 : null));
.putIfAbsent(
columnName,
isIceberg
? new RowBufferStats(
columnName,
1,
1,
Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
.as(LogicalTypeAnnotation.stringType())
.id(1)
.named("test"))
: new RowBufferStats(columnName, null, 1));
channelData.setChannelContext(
new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L));
return Collections.singletonList(channelData);
Expand Down
Loading
Loading