Skip to content

Commit

Permalink
Address comments & bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 8, 2024
1 parent dd087be commit ea6378d
Show file tree
Hide file tree
Showing 9 changed files with 652 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ static EpInfo buildEpInfoFromStats(
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues);
FileColumnProperties dto = new FileColumnProperties(stat);
String colName = colStat.getValue().getColumnDisplayName();
epInfo.getColumnEps().put(colName, dto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
Expand Down Expand Up @@ -50,43 +53,113 @@ class FileColumnProperties {
// Default value to use for min/max real when all data in the given column is NULL
public static final Double DEFAULT_MIN_MAX_REAL_VAL_FOR_EP = 0d;

FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
// Default value to use for min/max string when all data in the given Iceberg column is NULL
public static final String DEFAULT_MIN_MAX_STR_VAL_FOR_EP = "";

FileColumnProperties(RowBufferStats stats) {
this.setColumnOrdinal(stats.getOrdinal());
this.setFieldId(stats.getFieldId());
this.setCollation(stats.getCollationDefinitionString());
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
: stats.getCurrentMinIntValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMinRealValue());
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
: stats.getCurrentMaxRealValue());
this.setMaxLength(stats.getCurrentMaxLength());

if (stats.getIsIcebergColumn()) {
/* Only set corresponding min/max stats to default value for Iceberg columns if the all row values are null */
switch (stats.getPrimitiveType().getPrimitiveTypeName()) {
case BOOLEAN:
case INT32:
case INT64:
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMinIntValue());
break;
case FLOAT:
case DOUBLE:
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMaxRealValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMinRealValue());
break;
case BINARY:
this.setMaxStrValue(
stats.getCurrentMaxStrValue() == null
? DEFAULT_MIN_MAX_STR_VAL_FOR_EP
: truncateBytesAsHex(stats.getCurrentMaxStrValue(), false /* truncate down */));
this.setMinStrValue(
stats.getCurrentMinStrValue() == null
? DEFAULT_MIN_MAX_STR_VAL_FOR_EP
: truncateBytesAsHex(stats.getCurrentMinStrValue(), false /* truncate down */));
break;
case FIXED_LEN_BYTE_ARRAY:
if (stats.getPrimitiveType().getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMinIntValue());
} else {
this.setMaxStrValue(
stats.getCurrentMaxStrValue() == null
? DEFAULT_MIN_MAX_STR_VAL_FOR_EP
: truncateBytesAsHex(stats.getCurrentMaxStrValue(), true /* truncate down */));
this.setMinStrValue(
stats.getCurrentMinStrValue() == null
? DEFAULT_MIN_MAX_STR_VAL_FOR_EP
: truncateBytesAsHex(stats.getCurrentMinStrValue(), false /* truncate down */));
}
break;
default:
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Unsupported Iceberg column type: "
+ stats.getPrimitiveType().getPrimitiveTypeName());
}
} else {
/* Set every column to default value for FDN columns if the all row values are null */
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMaxIntValue());
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMinIntValue());
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMaxRealValue());
this.setMinRealValue(
stats.getCurrentMinRealValue() == null
? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMinRealValue());

// current hex-encoded max value, truncated up to 32 bytes
if (stats.getCurrentMaxStrValue() != null) {
String truncatedAsHex =
truncateBytesAsHex(stats.getCurrentMaxStrValue(), true /* truncate down */);
this.setMaxStrValue(truncatedAsHex);
}
// current hex-encoded min value, truncated down to 32 bytes
if (stats.getCurrentMinStrValue() != null) {
String truncatedAsHex =
truncateBytesAsHex(stats.getCurrentMinStrValue(), false /* truncate down */);
this.setMinStrValue(truncatedAsHex);
}
}
this.setMaxLength(stats.getCurrentMaxLength());
this.setMaxStrNonCollated(null);
this.setMinStrNonCollated(null);

// current hex-encoded min value, truncated down to 32 bytes
if (stats.getCurrentMinStrValue() != null) {
String truncatedAsHex = truncateBytesAsHex(stats.getCurrentMinStrValue(), false);
this.setMinStrValue(truncatedAsHex);
}

// current hex-encoded max value, truncated up to 32 bytes
if (stats.getCurrentMaxStrValue() != null) {
String truncatedAsHex = truncateBytesAsHex(stats.getCurrentMaxStrValue(), true);
this.setMaxStrValue(truncatedAsHex);
}

this.setNullCount(stats.getCurrentNullCount());
this.setDistinctValues(stats.getDistinctValues());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
Expand Down Expand Up @@ -101,8 +102,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
/* Streaming to FDN table doesn't support sub-columns, set up the stats here. */
this.statsMap.put(
column.getInternalName(),
new RowBufferStats(
column.getName(), column.getCollation(), column.getOrdinal(), null /* fieldId */));
new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal()));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
Expand All @@ -112,11 +112,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
*/
this.tempStatsMap.put(
column.getInternalName(),
new RowBufferStats(
column.getName(),
column.getCollation(),
column.getOrdinal(),
null /* fieldId */));
new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal()));
}
}

Expand Down Expand Up @@ -175,24 +171,19 @@ public void setupSchema(List<ColumnMetadata> columns) {
if (clientBufferParameters.getIsIcebergMode()) {
for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
String columnPath = concatDotPath(columnDescriptor.getPath());
PrimitiveType primitiveType = columnDescriptor.getPrimitiveType();

/* set fieldId to 0 for non-structured columns */
int fieldId =
columnDescriptor.getPath().length == 1
? 0
: columnDescriptor.getPrimitiveType().getId().intValue();
int fieldId = columnDescriptor.getPath().length == 1 ? 0 : primitiveType.getId().intValue();
int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue();

this.statsMap.put(
columnPath,
new RowBufferStats(columnPath, null /* collationDefinitionString */, ordinal, fieldId));
columnPath, new RowBufferStats(columnPath, ordinal, fieldId, primitiveType));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
columnPath,
new RowBufferStats(
columnPath, null /* collationDefinitionString */, ordinal, fieldId));
columnPath, new RowBufferStats(columnPath, ordinal, fieldId, primitiveType));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Objects;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.schema.PrimitiveType;

/** Keeps track of the active EP stats, used to generate a file EP info */
class RowBufferStats {
Expand All @@ -37,19 +38,58 @@ class RowBufferStats {
private final String collationDefinitionString;
/** Display name is required for the registration endpoint */
private final String columnDisplayName;
/** Whether the column is an Iceberg column */
private final boolean isIcebergColumn;
/** Primitive type of the column, only used for Iceberg columns */
private final PrimitiveType primitiveType;

/** Creates empty stats */
/** Creates empty Iceberg column stats */
RowBufferStats(
String columnDisplayName, String collationDefinitionString, int ordinal, Integer fieldId) {
String columnDisplayName, int ordinal, Integer fieldId, PrimitiveType primitiveType) {
this(
columnDisplayName,
null /* collationDefinitionString */,
ordinal,
fieldId,
true /* isIcebergColumn */,
primitiveType);
}

/** Creates empty FDN column stats */
RowBufferStats(String columnDisplayName, String collationDefinitionString, int ordinal) {
this(
columnDisplayName,
collationDefinitionString,
ordinal,
null /* fieldId */,
false /* isIcebergColumn */,
null /* primitiveType */);
}

RowBufferStats(
String columnDisplayName,
String collationDefinitionString,
int ordinal,
Integer fieldId,
boolean isIcebergColumn,
PrimitiveType primitiveType) {
this.columnDisplayName = columnDisplayName;
this.collationDefinitionString = collationDefinitionString;
this.ordinal = ordinal;
this.fieldId = fieldId;
this.isIcebergColumn = isIcebergColumn;
this.primitiveType = primitiveType;
if (isIcebergColumn && (primitiveType == null || fieldId == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Iceberg column %s must have a primitive type and field id",
columnDisplayName);
}
reset();
}

RowBufferStats(String columnDisplayName) {
this(columnDisplayName, null, -1, null);
this(columnDisplayName, null, -1, null, false, null);
}

void reset() {
Expand All @@ -69,7 +109,9 @@ RowBufferStats forkEmpty() {
this.getColumnDisplayName(),
this.getCollationDefinitionString(),
this.getOrdinal(),
this.getFieldId());
this.getFieldId(),
this.getIsIcebergColumn(),
this.getPrimitiveType());
}

// TODO performance test this vs in place update
Expand All @@ -87,7 +129,9 @@ static RowBufferStats getCombinedStats(RowBufferStats left, RowBufferStats right
left.columnDisplayName,
left.getCollationDefinitionString(),
left.getOrdinal(),
left.getFieldId());
left.getFieldId(),
left.getIsIcebergColumn(),
left.getPrimitiveType());

if (left.currentMinIntValue != null) {
combined.addIntValue(left.currentMinIntValue);
Expand Down Expand Up @@ -238,6 +282,14 @@ Integer getFieldId() {
return fieldId;
}

boolean getIsIcebergColumn() {
return isIcebergColumn;
}

PrimitiveType getPrimitiveType() {
return primitiveType;
}

/**
* Compares two byte arrays lexicographically. If the two arrays share a common prefix then the
* lexicographic comparison is the result of comparing two elements, as if by Byte.compare(byte,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -98,7 +101,18 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(int metada

channelData
.getColumnEps()
.putIfAbsent(columnName, new RowBufferStats(columnName, null, 1, isIceberg ? 0 : null));
.putIfAbsent(
columnName,
isIceberg
? new RowBufferStats(
columnName,
1,
1,
Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
.as(LogicalTypeAnnotation.stringType())
.id(1)
.named("test"))
: new RowBufferStats(columnName, null, 1));
channelData.setChannelContext(
new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L));
return Collections.singletonList(channelData);
Expand Down
Loading

0 comments on commit ea6378d

Please sign in to comment.