diff --git a/pom.xml b/pom.xml
index 1725ac89a..f0023e815 100644
--- a/pom.xml
+++ b/pom.xml
@@ -415,6 +415,12 @@
+
+ org.assertj
+ assertj-core
+ 3.26.3
+ test
+
org.mockito
mockito-core
@@ -634,6 +640,11 @@
commons-lang3
test
+
+ org.assertj
+ assertj-core
+ test
+
org.hamcrest
hamcrest-core
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
index f6c26f9bd..7b0adaabc 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
@@ -641,15 +641,16 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
- * @param setDefaultValues: whether to set default values for null fields the EPs
+ * @param setAllDefaultValues: whether to set default values for all null fields the EPs
+ * irrespective of the data type of this column
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(
- long rowCount, Map colStats, boolean setDefaultValues) {
+ long rowCount, Map colStats, boolean setAllDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
for (Map.Entry colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
- FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues);
+ FileColumnProperties dto = new FileColumnProperties(stat, setAllDefaultValues);
String colName = colStat.getValue().getColumnDisplayName();
epInfo.getColumnEps().put(colName, dto);
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
index edc8fd4c9..307093d3e 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
@@ -134,7 +134,7 @@ static Blob constructBlobAndMetadata(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount,
serializedChunk.columnEpStatsMapCombined,
- internalParameterProvider.setDefaultValuesInEp()))
+ internalParameterProvider.setAllDefaultValuesInEp()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
index 8d8bff3f5..035a88373 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
@@ -1150,7 +1150,7 @@ static void checkValueInRange(
static void checkFixedLengthByteArray(byte[] bytes, int length, final long insertRowIndex) {
if (bytes.length != length) {
throw new SFException(
- ErrorCode.INVALID_FORMAT_ROW,
+ ErrorCode.INVALID_VALUE_ROW,
String.format(
"Binary length mismatch: expected=%d, actual=%d, rowIndex:%d",
length, bytes.length, insertRowIndex));
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
index 3a8dbc2b6..b3c7aedf5 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
@@ -10,6 +10,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;
+import net.snowflake.ingest.utils.ErrorCode;
+import net.snowflake.ingest.utils.SFException;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
@@ -50,45 +53,107 @@ class FileColumnProperties {
// Default value to use for min/max real when all data in the given column is NULL
public static final Double DEFAULT_MIN_MAX_REAL_VAL_FOR_EP = 0d;
- FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
+ // Default value to use for min/max string when all data in the given Iceberg column is NULL
+ public static final String DEFAULT_MIN_MAX_STR_VAL_FOR_EP = "";
+
+ /**
+ * @param setAllDefaultValues Whether to set defaults for ALL fields, or only some. BDEC sets it
+ * for all but iceberg does not.
+ */
+ FileColumnProperties(RowBufferStats stats, boolean setAllDefaultValues) {
this.setColumnOrdinal(stats.getOrdinal());
this.setFieldId(stats.getFieldId());
this.setCollation(stats.getCollationDefinitionString());
+
+ if (setAllDefaultValues) {
+ /* Set every column to default value for FDN columns if the all row values are null */
+ setIntValues(stats);
+ setRealValues(stats);
+ setStringValues(stats, false /* replaceNullWithEmptyString */);
+ } else {
+ /* Only set corresponding min/max stats to default value for Iceberg columns if the all row values are null */
+ switch (stats.getPrimitiveType().getPrimitiveTypeName()) {
+ case BOOLEAN:
+ case INT32:
+ case INT64:
+ setIntValues(stats);
+ break;
+
+ case FLOAT:
+ case DOUBLE:
+ setRealValues(stats);
+ break;
+
+ case BINARY:
+ setStringValues(stats, true /* replaceNullWithEmptyString */);
+ break;
+
+ case FIXED_LEN_BYTE_ARRAY:
+ if (stats.getPrimitiveType().getLogicalTypeAnnotation()
+ instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ setIntValues(stats);
+ } else {
+ setStringValues(stats, true /* replaceNullWithEmptyString */);
+ }
+ break;
+
+ default:
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ "Unsupported Iceberg column type: "
+ + stats.getPrimitiveType().getPrimitiveTypeName());
+ }
+ }
+
+ this.setMaxLength(stats.getCurrentMaxLength());
+ this.setMaxStrNonCollated(null);
+ this.setMinStrNonCollated(null);
+ this.setNullCount(stats.getCurrentNullCount());
+ this.setDistinctValues(stats.getDistinctValues());
+ }
+
+ private void setIntValues(RowBufferStats stats) {
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
- ? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
+ ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMaxIntValue());
+
this.setMinIntValue(
stats.getCurrentMinIntValue() == null
- ? (setDefaultValues ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP : null)
+ ? DEFAULT_MIN_MAX_INT_VAL_FOR_EP
: stats.getCurrentMinIntValue());
- this.setMinRealValue(
- stats.getCurrentMinRealValue() == null
- ? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
- : stats.getCurrentMinRealValue());
+ }
+
+ private void setRealValues(RowBufferStats stats) {
this.setMaxRealValue(
stats.getCurrentMaxRealValue() == null
- ? (setDefaultValues ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP : null)
+ ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
: stats.getCurrentMaxRealValue());
- this.setMaxLength(stats.getCurrentMaxLength());
- this.setMaxStrNonCollated(null);
- this.setMinStrNonCollated(null);
-
- // current hex-encoded min value, truncated down to 32 bytes
- if (stats.getCurrentMinStrValue() != null) {
- String truncatedAsHex = truncateBytesAsHex(stats.getCurrentMinStrValue(), false);
- this.setMinStrValue(truncatedAsHex);
- }
+ this.setMinRealValue(
+ stats.getCurrentMinRealValue() == null
+ ? DEFAULT_MIN_MAX_REAL_VAL_FOR_EP
+ : stats.getCurrentMinRealValue());
+ }
+ private void setStringValues(RowBufferStats stats, boolean replaceNullWithEmptyString) {
// current hex-encoded max value, truncated up to 32 bytes
if (stats.getCurrentMaxStrValue() != null) {
- String truncatedAsHex = truncateBytesAsHex(stats.getCurrentMaxStrValue(), true);
+ String truncatedAsHex =
+ truncateBytesAsHex(stats.getCurrentMaxStrValue(), true /* truncateUp */);
this.setMaxStrValue(truncatedAsHex);
+ } else if (replaceNullWithEmptyString) {
+ this.setMaxStrValue(DEFAULT_MIN_MAX_STR_VAL_FOR_EP);
}
- this.setNullCount(stats.getCurrentNullCount());
- this.setDistinctValues(stats.getDistinctValues());
+ // current hex-encoded min value, truncated down to 32 bytes
+ if (stats.getCurrentMinStrValue() != null) {
+ String truncatedAsHex =
+ truncateBytesAsHex(stats.getCurrentMinStrValue(), false /* truncateUp */);
+ this.setMinStrValue(truncatedAsHex);
+ } else if (replaceNullWithEmptyString) {
+ this.setMinStrValue(DEFAULT_MIN_MAX_STR_VAL_FOR_EP);
+ }
}
@JsonProperty("columnId")
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java
index 11a2858f6..4ab8ecc4b 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java
@@ -20,9 +20,11 @@ boolean getEnableChunkEncryption() {
return !isIcebergMode;
}
- boolean setDefaultValuesInEp() {
- // When in Iceberg mode, we need to populate nulls (instead of zeroes) in the minIntValue /
- // maxIntValue / minRealValue / maxRealValue fields of the EP Metadata.
+ boolean setAllDefaultValuesInEp() {
+ // When in non-iceberg mode, we want to default the stats for all data types (int/real/string)
+ // to 0 / to "".
+ // However when in iceberg mode, we want to default only those stats that are
+ // relevant to the current datatype.
return !isIcebergMode;
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
index ed19971ab..339210f65 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
@@ -27,6 +27,7 @@
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
/**
@@ -88,21 +89,26 @@ public void setupSchema(List columns) {
/* Set up fields using top level column information */
validateColumnCollation(column);
ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id);
- parquetTypes.add(typeInfo.getParquetType());
+ Type parquetType = typeInfo.getParquetType();
+ parquetTypes.add(parquetType);
this.metadata.putAll(typeInfo.getMetadata());
int columnIndex = parquetTypes.size() - 1;
- fieldIndex.put(
- column.getInternalName(),
- new ParquetColumn(column, columnIndex, typeInfo.getParquetType()));
+ fieldIndex.put(column.getInternalName(), new ParquetColumn(column, columnIndex, parquetType));
+
if (!column.getNullable()) {
addNonNullableFieldName(column.getInternalName());
}
+
if (!clientBufferParameters.getIsIcebergMode()) {
/* Streaming to FDN table doesn't support sub-columns, set up the stats here. */
this.statsMap.put(
column.getInternalName(),
new RowBufferStats(
- column.getName(), column.getCollation(), column.getOrdinal(), null /* fieldId */));
+ column.getName(),
+ column.getCollation(),
+ column.getOrdinal(),
+ null /* fieldId */,
+ parquetType.isPrimitive() ? parquetType.asPrimitiveType() : null));
if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
@@ -116,7 +122,8 @@ public void setupSchema(List columns) {
column.getName(),
column.getCollation(),
column.getOrdinal(),
- null /* fieldId */));
+ null /* fieldId */,
+ parquetType.isPrimitive() ? parquetType.asPrimitiveType() : null));
}
}
@@ -175,24 +182,27 @@ public void setupSchema(List columns) {
if (clientBufferParameters.getIsIcebergMode()) {
for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
String columnPath = concatDotPath(columnDescriptor.getPath());
+ PrimitiveType primitiveType = columnDescriptor.getPrimitiveType();
/* set fieldId to 0 for non-structured columns */
- int fieldId =
- columnDescriptor.getPath().length == 1
- ? 0
- : columnDescriptor.getPrimitiveType().getId().intValue();
+ int fieldId = columnDescriptor.getPath().length == 1 ? 0 : primitiveType.getId().intValue();
int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue();
this.statsMap.put(
columnPath,
- new RowBufferStats(columnPath, null /* collationDefinitionString */, ordinal, fieldId));
+ new RowBufferStats(
+ columnPath, null /* collationDefinitionString */, ordinal, fieldId, primitiveType));
if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
columnPath,
new RowBufferStats(
- columnPath, null /* collationDefinitionString */, ordinal, fieldId));
+ columnPath,
+ null /* collationDefinitionString */,
+ ordinal,
+ fieldId,
+ primitiveType));
}
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java
index 2fae695f0..4d7781c78 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java
@@ -11,6 +11,7 @@
import java.util.Objects;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
+import org.apache.parquet.schema.PrimitiveType;
/** Keeps track of the active EP stats, used to generate a file EP info */
class RowBufferStats {
@@ -25,6 +26,12 @@ class RowBufferStats {
*/
private final Integer fieldId;
+ private final String collationDefinitionString;
+ /** Display name is required for the registration endpoint */
+ private final String columnDisplayName;
+ /** Primitive type of the column, only used for Iceberg columns */
+ private final PrimitiveType primitiveType;
+
private byte[] currentMinStrValue;
private byte[] currentMaxStrValue;
private BigInteger currentMinIntValue;
@@ -34,22 +41,27 @@ class RowBufferStats {
private long currentNullCount;
// for binary or string columns
private long currentMaxLength;
- private final String collationDefinitionString;
- /** Display name is required for the registration endpoint */
- private final String columnDisplayName;
- /** Creates empty stats */
RowBufferStats(
- String columnDisplayName, String collationDefinitionString, int ordinal, Integer fieldId) {
+ String columnDisplayName,
+ String collationDefinitionString,
+ int ordinal,
+ Integer fieldId,
+ PrimitiveType primitiveType) {
this.columnDisplayName = columnDisplayName;
this.collationDefinitionString = collationDefinitionString;
this.ordinal = ordinal;
this.fieldId = fieldId;
+ this.primitiveType = primitiveType;
reset();
}
RowBufferStats(String columnDisplayName) {
- this(columnDisplayName, null, -1, null);
+ this(columnDisplayName, null, -1, null, null);
+ }
+
+ RowBufferStats(String columnDisplayName, PrimitiveType primitiveType) {
+ this(columnDisplayName, null, -1, null, primitiveType);
}
void reset() {
@@ -69,7 +81,8 @@ RowBufferStats forkEmpty() {
this.getColumnDisplayName(),
this.getCollationDefinitionString(),
this.getOrdinal(),
- this.getFieldId());
+ this.getFieldId(),
+ this.getPrimitiveType());
}
// TODO performance test this vs in place update
@@ -87,7 +100,8 @@ static RowBufferStats getCombinedStats(RowBufferStats left, RowBufferStats right
left.columnDisplayName,
left.getCollationDefinitionString(),
left.getOrdinal(),
- left.getFieldId());
+ left.getFieldId(),
+ left.getPrimitiveType());
if (left.currentMinIntValue != null) {
combined.addIntValue(left.currentMinIntValue);
@@ -238,6 +252,10 @@ Integer getFieldId() {
return fieldId;
}
+ PrimitiveType getPrimitiveType() {
+ return primitiveType;
+ }
+
/**
* Compares two byte arrays lexicographically. If the two arrays share a common prefix then the
* lexicographic comparison is the result of comparing two elements, as if by Byte.compare(byte,
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java
index 185fa5ded..1330152a4 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java
@@ -15,7 +15,10 @@
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetWriter;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -98,7 +101,19 @@ private List> createChannelDataPerTable(int metada
channelData
.getColumnEps()
- .putIfAbsent(columnName, new RowBufferStats(columnName, null, 1, isIceberg ? 0 : null));
+ .putIfAbsent(
+ columnName,
+ isIceberg
+ ? new RowBufferStats(
+ columnName,
+ null,
+ 1,
+ 1,
+ Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .id(1)
+ .named("test"))
+ : new RowBufferStats(columnName, null, 1, null, null));
channelData.setChannelContext(
new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L));
return Collections.singletonList(channelData);
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java
index 7b131b310..f4ffa11c4 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/FileColumnPropertiesTest.java
@@ -4,6 +4,9 @@
package net.snowflake.ingest.streaming.internal;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runners.Parameterized;
@@ -19,10 +22,21 @@ public static Object[] isIceberg() {
@Test
public void testFileColumnPropertiesConstructor() {
// Test simple construction
- RowBufferStats stats = new RowBufferStats("COL", null, 1, isIceberg ? 1 : null);
+ RowBufferStats stats =
+ isIceberg
+ ? new RowBufferStats(
+ "COL",
+ null,
+ 1,
+ 1,
+ Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .id(1)
+ .named("test"))
+ : new RowBufferStats("COL", null, 1, null, null);
stats.addStrValue("bcd");
stats.addStrValue("abcde");
- FileColumnProperties props = new FileColumnProperties(stats, isIceberg);
+ FileColumnProperties props = new FileColumnProperties(stats, !isIceberg);
Assert.assertEquals(1, props.getColumnOrdinal());
Assert.assertEquals(isIceberg ? 1 : null, props.getFieldId());
Assert.assertEquals("6162636465", props.getMinStrValue());
@@ -31,10 +45,21 @@ public void testFileColumnPropertiesConstructor() {
Assert.assertNull(props.getMaxStrNonCollated());
// Test that truncation is performed
- stats = new RowBufferStats("COL", null, 1, isIceberg ? 0 : null);
+ stats =
+ isIceberg
+ ? new RowBufferStats(
+ "COL",
+ null,
+ 1,
+ 1,
+ Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .id(1)
+ .named("test"))
+ : new RowBufferStats("COL", null, 1, null, null);
stats.addStrValue("aßßßßßßßßßßßßßßßß");
Assert.assertEquals(33, stats.getCurrentMinStrValue().length);
- props = new FileColumnProperties(stats, isIceberg);
+ props = new FileColumnProperties(stats, !isIceberg);
Assert.assertEquals(1, props.getColumnOrdinal());
Assert.assertNull(props.getMinStrNonCollated());
Assert.assertNull(props.getMaxStrNonCollated());
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java
index cd7354f09..ca0fd5295 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java
@@ -52,6 +52,8 @@
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.SFException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -877,8 +879,12 @@ public void testBuildAndUpload() throws Exception {
Map eps1 = new HashMap<>();
Map eps2 = new HashMap<>();
- RowBufferStats stats1 = new RowBufferStats("COL1");
- RowBufferStats stats2 = new RowBufferStats("COL1");
+ RowBufferStats stats1 =
+ new RowBufferStats(
+ "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"));
+ RowBufferStats stats2 =
+ new RowBufferStats(
+ "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"));
eps1.put("one", stats1);
eps2.put("one", stats2);
@@ -1115,7 +1121,9 @@ public void testBlobBuilder() throws Exception {
Map eps1 = new HashMap<>();
- RowBufferStats stats1 = new RowBufferStats("COL1");
+ RowBufferStats stats1 =
+ new RowBufferStats(
+ "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1"));
eps1.put("one", stats1);
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java
index 5e5b96fc3..5d7873b95 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java
@@ -29,6 +29,8 @@
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
import org.apache.parquet.hadoop.BdecParquetReader;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -563,12 +565,18 @@ private void testDoubleQuotesColumnNameHelper(OpenChannelRequest.OnErrorOption o
public void testBuildEpInfoFromStats() {
Map colStats = new HashMap<>();
- RowBufferStats stats1 = new RowBufferStats("intColumn");
+ RowBufferStats stats1 =
+ new RowBufferStats(
+ "intColumn",
+ Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn"));
stats1.addIntValue(BigInteger.valueOf(2));
stats1.addIntValue(BigInteger.valueOf(10));
stats1.addIntValue(BigInteger.valueOf(1));
- RowBufferStats stats2 = new RowBufferStats("strColumn");
+ RowBufferStats stats2 =
+ new RowBufferStats(
+ "strColumn",
+ Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn"));
stats2.addStrValue("alice");
stats2.addStrValue("bob");
stats2.incCurrentNullCount();
@@ -603,8 +611,14 @@ public void testBuildEpInfoFromNullColumnStats() {
final String realColName = "realCol";
Map colStats = new HashMap<>();
- RowBufferStats stats1 = new RowBufferStats(intColName);
- RowBufferStats stats2 = new RowBufferStats(realColName);
+ RowBufferStats stats1 =
+ new RowBufferStats(
+ intColName,
+ Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named(intColName));
+ RowBufferStats stats2 =
+ new RowBufferStats(
+ realColName,
+ Types.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).id(2).named(realColName));
stats1.incCurrentNullCount();
stats2.incCurrentNullCount();
@@ -618,22 +632,18 @@ public void testBuildEpInfoFromNullColumnStats() {
FileColumnProperties intColumnResult = columnResults.get(intColName);
Assert.assertEquals(-1, intColumnResult.getDistinctValues());
Assert.assertEquals(
- isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP,
- intColumnResult.getMinIntValue());
+ FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMinIntValue());
Assert.assertEquals(
- isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP,
- intColumnResult.getMaxIntValue());
+ FileColumnProperties.DEFAULT_MIN_MAX_INT_VAL_FOR_EP, intColumnResult.getMaxIntValue());
Assert.assertEquals(1, intColumnResult.getNullCount());
Assert.assertEquals(0, intColumnResult.getMaxLength());
FileColumnProperties realColumnResult = columnResults.get(realColName);
Assert.assertEquals(-1, intColumnResult.getDistinctValues());
Assert.assertEquals(
- isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP,
- realColumnResult.getMinRealValue());
+ FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMinRealValue());
Assert.assertEquals(
- isIcebergMode ? null : FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP,
- realColumnResult.getMaxRealValue());
+ FileColumnProperties.DEFAULT_MIN_MAX_REAL_VAL_FOR_EP, realColumnResult.getMaxRealValue());
Assert.assertEquals(1, realColumnResult.getNullCount());
Assert.assertEquals(0, realColumnResult.getMaxLength());
}
@@ -642,12 +652,18 @@ public void testBuildEpInfoFromNullColumnStats() {
public void testInvalidEPInfo() {
Map colStats = new HashMap<>();
- RowBufferStats stats1 = new RowBufferStats("intColumn");
+ RowBufferStats stats1 =
+ new RowBufferStats(
+ "intColumn",
+ Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("intColumn"));
stats1.addIntValue(BigInteger.valueOf(2));
stats1.addIntValue(BigInteger.valueOf(10));
stats1.addIntValue(BigInteger.valueOf(1));
- RowBufferStats stats2 = new RowBufferStats("strColumn");
+ RowBufferStats stats2 =
+ new RowBufferStats(
+ "strColumn",
+ Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).id(2).named("strColumn"));
stats2.addStrValue("alice");
stats2.incCurrentNullCount();
stats2.incCurrentNullCount();
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java
index 0dbeeebee..14de4342b 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java
@@ -61,6 +61,8 @@
import net.snowflake.ingest.utils.SnowflakeURL;
import net.snowflake.ingest.utils.Utils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
import org.bouncycastle.asn1.nist.NISTObjectIdentifiers;
import org.bouncycastle.openssl.jcajce.JcaPEMWriter;
import org.bouncycastle.operator.OperatorCreationException;
@@ -500,8 +502,11 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception {
.build();
Map columnEps = new HashMap<>();
- columnEps.put("column", new RowBufferStats("COL1"));
- EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode);
+ columnEps.put(
+ "column",
+ new RowBufferStats(
+ "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1")));
+ EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode);
ChunkMetadata chunkMetadata =
ChunkMetadata.builder()
@@ -549,8 +554,11 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception {
private Pair, Set> getRetryBlobMetadata() {
Map columnEps = new HashMap<>();
- columnEps.put("column", new RowBufferStats("COL1"));
- EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, isIcebergMode);
+ columnEps.put(
+ "column",
+ new RowBufferStats(
+ "COL1", Types.optional(PrimitiveType.PrimitiveTypeName.INT32).id(1).named("COL1")));
+ EpInfo epInfo = AbstractRowBuffer.buildEpInfoFromStats(1, columnEps, !isIcebergMode);
ChannelMetadata channelMetadata1 =
ChannelMetadata.builder()
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java
index c39ffe967..3c58fb000 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java
@@ -21,12 +21,15 @@
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
-import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
+import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.utils.Constants;
+import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.SFException;
+import net.snowflake.ingest.utils.SnowflakeURL;
+import net.snowflake.ingest.utils.Utils;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
@@ -60,7 +63,7 @@ public abstract class AbstractDataTypeTest {
private String schemaName = "PUBLIC";
private SnowflakeStreamingIngestClient client;
- private static final ObjectMapper objectMapper = new ObjectMapper();
+ protected static final ObjectMapper objectMapper = new ObjectMapper();
@Parameters(name = "{index}: {0}")
public static Object[] compressionAlgorithms() {
@@ -69,8 +72,7 @@ public static Object[] compressionAlgorithms() {
@Parameter public String compressionAlgorithm;
- @Before
- public void before() throws Exception {
+ public void before(boolean isIceberg) throws Exception {
databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier());
conn = TestUtils.getConnection(true);
conn.createStatement().execute(String.format("create or replace database %s;", databaseName));
@@ -84,7 +86,16 @@ public void before() throws Exception {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
props.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
- client = SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(props).build();
+
+ // Override Iceberg mode client lag to 1 second for faster test execution
+ Map parameterMap = new HashMap<>();
+ parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 1000L);
+
+ Properties prop = Utils.createProperties(props);
+ SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));
+ client =
+ new SnowflakeStreamingIngestClientInternal<>(
+ "client1", accountURL, prop, parameterMap, isIceberg, false);
}
@After
@@ -112,6 +123,23 @@ protected String createTable(String dataType) throws SQLException {
String.format(
"create or replace table %s (%s string, %s %s)",
tableName, SOURCE_COLUMN_NAME, VALUE_COLUMN_NAME, dataType));
+
+ return tableName;
+ }
+
+ protected String createIcebergTable(String dataType) throws SQLException {
+ String tableName = getRandomIdentifier();
+ String baseLocation =
+ String.format("%s/%s/%s", databaseName, dataType.replace(" ", "_"), tableName);
+ conn.createStatement()
+ .execute(
+ String.format(
+ "create or replace iceberg table %s (%s string, %s %s) "
+ + "catalog = 'SNOWFLAKE' "
+ + "external_volume = 'streaming_ingest' "
+ + "base_location = '%s';",
+ tableName, SOURCE_COLUMN_NAME, VALUE_COLUMN_NAME, dataType, baseLocation));
+
return tableName;
}
@@ -196,7 +224,14 @@ protected void expectNotSupported(String dataType, T value) throws Exception
*/
void testIngestion(String dataType, VALUE expectedValue, Provider selectProvider)
throws Exception {
- ingestAndAssert(dataType, expectedValue, null, expectedValue, null, selectProvider);
+ ingestAndAssert(
+ dataType, expectedValue, null, expectedValue, null, selectProvider, false /* isIceberg */);
+ }
+
+ void testIcebergIngestion(
+ String dataType, VALUE expectedValue, Provider selectProvider) throws Exception {
+ ingestAndAssert(
+ dataType, expectedValue, null, expectedValue, null, selectProvider, true /* isIceberg */);
}
/**
@@ -209,7 +244,30 @@ void testIngestion(
JDBC_READ expectedValue,
Provider selectProvider)
throws Exception {
- ingestAndAssert(dataType, streamingIngestWriteValue, null, expectedValue, null, selectProvider);
+ ingestAndAssert(
+ dataType,
+ streamingIngestWriteValue,
+ null,
+ expectedValue,
+ null,
+ selectProvider,
+ false /* isIceberg */);
+ }
+
+ void testIcebergIngestion(
+ String dataType,
+ STREAMING_INGEST_WRITE streamingIngestWriteValue,
+ JDBC_READ expectedValue,
+ Provider selectProvider)
+ throws Exception {
+ ingestAndAssert(
+ dataType,
+ streamingIngestWriteValue,
+ null,
+ expectedValue,
+ null,
+ selectProvider,
+ true /* isIceberg */);
}
/**
@@ -218,7 +276,7 @@ void testIngestion(
*/
void testJdbcTypeCompatibility(String typeName, T value, Provider provider)
throws Exception {
- ingestAndAssert(typeName, value, value, value, provider, provider);
+ ingestAndAssert(typeName, value, value, value, provider, provider, false /* isIceberg */);
}
/** Simplified version where write value for streaming ingest and JDBC are the same */
@@ -230,22 +288,29 @@ void testJdbcTypeCompatibility(
Provider selectProvider)
throws Exception {
ingestAndAssert(
- typeName, writeValue, writeValue, expectedValue, insertProvider, selectProvider);
+ typeName,
+ writeValue,
+ writeValue,
+ expectedValue,
+ insertProvider,
+ selectProvider,
+ false /* isIceberg */);
}
/**
* Ingests values with streaming ingest and JDBC driver, SELECTs them back with WHERE condition
* and asserts they exist.
*
+ * @param Type ingested by streaming ingest
+ * @param Type written by JDBC driver
+ * @param Type read by JDBC driver
* @param dataType Snowflake data type
* @param streamingIngestWriteValue Value ingested by streaming ingest
* @param jdbcWriteValue Value written by JDBC driver
* @param expectedValue Expected value received from JDBC driver SELECT
* @param insertProvider JDBC parameter provider for INSERT
* @param selectProvider JDBC parameter provider for SELECT ... WHERE
- * @param Type ingested by streaming ingest
- * @param Type written by JDBC driver
- * @param Type read by JDBC driver
+ * @param isIceberg whether the table is an iceberg table
*/
void ingestAndAssert(
String dataType,
@@ -253,13 +318,14 @@ void ingestAndAssert(
JDBC_WRITE jdbcWriteValue,
JDBC_READ expectedValue,
Provider insertProvider,
- Provider selectProvider)
+ Provider selectProvider,
+ boolean isIceberg)
throws Exception {
if (jdbcWriteValue == null ^ insertProvider == null)
throw new IllegalArgumentException(
"jdbcWriteValue and provider must be both null or not null");
boolean insertAlsoWithJdbc = jdbcWriteValue != null;
- String tableName = createTable(dataType);
+ String tableName = isIceberg ? createIcebergTable(dataType) : createTable(dataType);
String offsetToken = UUID.randomUUID().toString();
// Insert using JDBC
@@ -287,13 +353,13 @@ void ingestAndAssert(
if (expectedValue == null) {
selectQuery =
String.format("select count(*) from %s where %s is NULL", tableName, VALUE_COLUMN_NAME);
- } else if (dataType.startsWith("TIMESTAMP_")) {
+ } else if (dataType.toUpperCase().startsWith("TIMESTAMP")) {
selectQuery =
String.format(
"select count(*) from %s where to_varchar(%s, 'YYYY-MM-DD HH24:MI:SS.FF TZHTZM') ="
+ " ?;",
tableName, VALUE_COLUMN_NAME);
- } else if (dataType.startsWith("TIME")) {
+ } else if (dataType.toUpperCase().startsWith("TIME")) {
selectQuery =
String.format(
"select count(*) from %s where to_varchar(%s, 'HH24:MI:SS.FF TZHTZM') = ?;",
@@ -310,7 +376,9 @@ void ingestAndAssert(
Assert.assertTrue(resultSet.next());
int count = resultSet.getInt(1);
Assert.assertEquals(insertAlsoWithJdbc ? 2 : 1, count);
- migrateTable(tableName); // migration should always succeed
+ if (!isIceberg) {
+ migrateTable(tableName); // migration should always succeed
+ }
}
void assertVariant(
@@ -427,4 +495,38 @@ protected final void ingestManyAndMigrate(
TestUtils.waitForOffset(channel, offsetToken);
migrateTable(tableName); // migration should always succeed
}
+
+ protected void testIcebergIngestAndQuery(
+ String dataType,
+ Iterable