diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java index 7c2f90710..d29178a32 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -492,6 +492,11 @@ private static ParquetBufferValue get3LevelListValue( listVal.add(Collections.singletonList(parsedValue.getValue())); estimatedParquetSize += parsedValue.getSize(); } + if (listVal.isEmpty()) { + subColumnFinder + .getSubColumns(path) + .forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount()); + } return new ParquetBufferValue(listVal, estimatedParquetSize); } @@ -539,6 +544,11 @@ private static ParquetBufferValue get3LevelMapValue( listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue())); estimatedParquetSize += parsedKey.getSize() + parsedValue.getSize(); } + if (listVal.isEmpty()) { + subColumnFinder + .getSubColumns(path) + .forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount()); + } return new ParquetBufferValue(listVal, estimatedParquetSize); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java index 6d61c8eea..8a3984c08 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java @@ -286,6 +286,9 @@ Double getCurrentMaxRealValue() { void incCurrentNullCount() { this.currentNullCount += 1; + if (enableValuesCount) { + numberOfValues++; + } } long getCurrentNullCount() { diff --git a/src/main/java/org/apache/parquet/hadoop/SnowflakeParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/SnowflakeParquetWriter.java index c3d18cae5..c1be2818e 100644 --- a/src/main/java/org/apache/parquet/hadoop/SnowflakeParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/SnowflakeParquetWriter.java @@ -389,13 +389,14 @@ private void writeValues(List values, GroupType type) { } } else { /* Struct */ - recordConsumer.startGroup(); - if (val instanceof List) { - writeValues((List) val, cols.get(i).asGroupType()); - } else { + if (!(val instanceof List)) { throw new ParquetEncodingException( String.format("Field %s should be a 2 level struct", fieldName)); } + recordConsumer.startGroup(); + if (!((List) val).isEmpty()) { + writeValues((List) val, cols.get(i).asGroupType()); + } recordConsumer.endGroup(); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java index a00cb538e..a145c86a8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferStatsTest.java @@ -180,7 +180,7 @@ public void testGetCombinedStats() throws Exception { Assert.assertEquals(BigInteger.valueOf(1), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); - Assert.assertEquals(enableNDVAndNV ? 8 : -1, result.getNumberOfValues()); + Assert.assertEquals(enableNDVAndNV ? 10 : -1, result.getNumberOfValues()); Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinStrValue()); @@ -239,7 +239,7 @@ public void testGetCombinedStats() throws Exception { Assert.assertEquals(2, result.getCurrentNullCount()); Assert.assertEquals(5, result.getCurrentMaxLength()); Assert.assertEquals(enableNDVAndNV ? 7 : -1, result.getDistinctValues()); - Assert.assertEquals(enableNDVAndNV ? 8 : -1, result.getNumberOfValues()); + Assert.assertEquals(enableNDVAndNV ? 10 : -1, result.getNumberOfValues()); Assert.assertNull(result.getCurrentMinRealValue()); Assert.assertNull(result.getCurrentMaxRealValue()); @@ -264,7 +264,7 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertEquals(BigInteger.valueOf(2), result.getCurrentMinIntValue()); Assert.assertEquals(BigInteger.valueOf(8), result.getCurrentMaxIntValue()); Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); - Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getNumberOfValues()); + Assert.assertEquals(enableNDVAndNV ? 6 : -1, result.getNumberOfValues()); Assert.assertEquals(2, result.getCurrentNullCount()); @@ -309,7 +309,7 @@ public void testGetCombinedStatsNull() throws Exception { Assert.assertArrayEquals("g".getBytes(StandardCharsets.UTF_8), result.getCurrentMaxStrValue()); Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getDistinctValues()); - Assert.assertEquals(enableNDVAndNV ? 4 : -1, result.getNumberOfValues()); + Assert.assertEquals(enableNDVAndNV ? 5 : -1, result.getNumberOfValues()); Assert.assertEquals(1, result.getCurrentNullCount()); Assert.assertNull(result.getCurrentMinRealValue()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 246753fbe..042834ce7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -2097,7 +2097,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { .isEqualTo("key2".getBytes(StandardCharsets.UTF_8)); assertThat(columnEpStats.get("COLMAP.key_value.key").getCurrentNullCount()).isEqualTo(1); assertThat(columnEpStats.get("COLMAP.key_value.key").getDistinctValues()).isEqualTo(2); - assertThat(columnEpStats.get("COLMAP.key_value.key").getNumberOfValues()).isEqualTo(2); + assertThat(columnEpStats.get("COLMAP.key_value.key").getNumberOfValues()).isEqualTo(3); assertThat(columnEpStats.get("COLMAP.key_value.value").getCurrentMinIntValue()) .isEqualTo(BigInteger.ONE); @@ -2105,7 +2105,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { .isEqualTo(BigInteger.ONE); assertThat(columnEpStats.get("COLMAP.key_value.value").getCurrentNullCount()).isEqualTo(1); assertThat(columnEpStats.get("COLMAP.key_value.value").getDistinctValues()).isEqualTo(1); - assertThat(columnEpStats.get("COLMAP.key_value.value").getNumberOfValues()).isEqualTo(2); + assertThat(columnEpStats.get("COLMAP.key_value.value").getNumberOfValues()).isEqualTo(3); assertThat(columnEpStats.get("COLARRAY.list.element").getCurrentMinIntValue()) .isEqualTo(BigInteger.ONE); @@ -2113,7 +2113,7 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { .isEqualTo(BigInteger.ONE); assertThat(columnEpStats.get("COLARRAY.list.element").getCurrentNullCount()).isEqualTo(1); assertThat(columnEpStats.get("COLARRAY.list.element").getDistinctValues()).isEqualTo(1); - assertThat(columnEpStats.get("COLARRAY.list.element").getNumberOfValues()).isEqualTo(4); + assertThat(columnEpStats.get("COLARRAY.list.element").getNumberOfValues()).isEqualTo(5); } private static Thread getThreadThatWaitsForLockReleaseAndFlushes( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index ddf473dc5..73e3700f1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -46,6 +46,8 @@ public void testStructuredDataType() throws Exception { "object(a int, b string, c boolean)", "{\"a\": 1, \"b\": \"test\", \"c\": true}"); assertStructuredDataType("map(string, int)", "{\"key1\": 1}"); assertStructuredDataType("array(int)", "[1, 2, 3]"); + assertStructuredDataType("array(string) not null", "[]"); + assertStructuredDataType("map(string, int) not null", "{}"); assertMap( "map(string, int)", new HashMap() { @@ -95,59 +97,30 @@ public void testStructuredDataType() throws Exception { .isInstanceOf(SFException.class) .extracting("vendorCode") .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + } - /* Nested data types. Should be fixed. Fixed in server side. */ - Assertions.assertThatThrownBy( - () -> assertStructuredDataType("array(array(int))", "[[1, 2], [3, 4]]")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> - assertStructuredDataType( - "array(map(string, int))", "[{\"key1\": 1}, {\"key2\": 2}]")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> - assertStructuredDataType( - "array(object(a int, b string, c boolean))", - "[{\"a\": 1, \"b\": \"test\", \"c\": true}]")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> - assertStructuredDataType( - "map(string, object(a int, b string, c boolean))", - "{\"key1\": {\"a\": 1, \"b\": \"test\", \"c\": true}}")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> assertStructuredDataType("map(string, array(int))", "{\"key1\": [1, 2, 3]}")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> - assertStructuredDataType( - "map(string, map(string, int))", "{\"key1\": {\"key2\": 2}}")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> - assertStructuredDataType( - "map(string, array(array(int)))", "{\"key1\": [[1, 2], [3, 4]]}")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> - assertStructuredDataType( - "map(string, array(map(string, int)))", - "{\"key1\": [{\"key2\": 2}, {\"key3\": 3}]}")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> - assertStructuredDataType( - "map(string, array(object(a int, b string, c boolean)))", - "{\"key1\": [{\"a\": 1, \"b\": \"test\", \"c\": true}]}")) - .isInstanceOf(SFException.class); - Assertions.assertThatThrownBy( - () -> - assertStructuredDataType( - "object(a int, b array(int), c map(string, int))", - "{\"a\": 1, \"b\": [1, 2, 3], \"c\": {\"key1\": 1}}")) - .isInstanceOf(SFException.class); + @Test + public void testNestedDataType() throws Exception { + assertStructuredDataType("map(string, map(string, object(a int, b string)))", "{}"); + assertStructuredDataType("array(map(string, int))", "[{}]"); + assertStructuredDataType("array(array(int))", "[[1, 2], [3, 4]]"); + assertStructuredDataType("array(map(string, int))", "[{\"key1\": 1}, {\"key2\": 2}]"); + assertStructuredDataType( + "array(object(a int, b string, c boolean))", "[{\"a\": 1, \"b\": \"test\", \"c\": true}]"); + assertStructuredDataType( + "map(string, object(a int, b string, c boolean))", + "{\"key1\": {\"a\": 1, \"b\": \"test\", \"c\": true}}"); + assertStructuredDataType("map(string, array(int))", "{\"key1\": [1, 2, 3]}"); + assertStructuredDataType("map(string, map(string, int))", "{\"key1\": {\"key2\": 2}}"); + assertStructuredDataType("map(string, array(array(int)))", "{\"key1\": [[1, 2], [3, 4]]}"); + assertStructuredDataType( + "map(string, array(map(string, int)))", "{\"key1\": [{\"key2\": 2}, {\"key3\": 3}]}"); + assertStructuredDataType( + "map(string, array(object(a int, b string, c boolean)))", + "{\"key1\": [{\"a\": 1, \"b\": \"test\", \"c\": true}]}"); + assertStructuredDataType( + "object(a int, b array(int), c map(string, int))", + "{\"a\": 1, \"b\": [1, 2, 3], \"c\": {\"key1\": 1}}"); } private void assertStructuredDataType(String dataType, String value) throws Exception {