From 02e4a7048724b8f87b158235cc2be916c08ad2e2 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 26 Sep 2024 16:12:41 -0700 Subject: [PATCH] Address comments & add comments --- .../internal/ClientBufferParameters.java | 6 ++ .../internal/IcebergParquetValueParser.java | 3 +- .../streaming/internal/ParquetRowBuffer.java | 83 ++++++++++++++++--- .../net/snowflake/ingest/utils/Utils.java | 13 +-- .../parquet/hadoop/BdecParquetWriter.java | 21 ++++- 5 files changed, 102 insertions(+), 24 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index dfadd029a..0a9711ee8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -10,6 +10,8 @@ /** Channel's buffer relevant parameters that are set at the owning client level. */ public class ClientBufferParameters { + private static final String BDEC_PARQUET_MESSAGE_TYPE_NAME = "bdec"; + private static final String PARQUET_MESSAGE_TYPE_NAME = "schema"; private long maxChunkSizeInBytes; @@ -118,4 +120,8 @@ public boolean getIsIcebergMode() { public Optional getMaxRowGroups() { return maxRowGroups; } + + public String getParquetMessageTypeName() { + return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java index 93b62d7c4..8c56896c7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -6,6 +6,7 @@ import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray; import static net.snowflake.ingest.utils.Utils.concatDotPath; +import static net.snowflake.ingest.utils.Utils.isNullOrEmpty; import java.math.BigDecimal; import java.math.BigInteger; @@ -72,7 +73,7 @@ private static ParquetBufferValue parseColumnValueToParquet( long insertRowsCurrIndex, String path, boolean isDescendantsOfRepeatingGroup) { - path = concatDotPath(path, type.getName()); + path = isNullOrEmpty(path) ? type.getName() : concatDotPath(path, type.getName()); float estimatedParquetSize = 0F; if (type.isPrimitive()) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 5dd6b4640..38f8a3bce 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -36,7 +36,6 @@ * converted to Parquet format for faster processing */ public class ParquetRowBuffer extends AbstractRowBuffer { - private static final String PARQUET_MESSAGE_TYPE_NAME = "bdec"; private final Map fieldIndex; @@ -74,6 +73,11 @@ public class ParquetRowBuffer extends AbstractRowBuffer { this.tempData = new ArrayList<>(); } + /** + * Set up the parquet schema. + * + * @param columns top level columns list of column metadata + */ @Override public void setupSchema(List columns) { fieldIndex.clear(); @@ -82,7 +86,9 @@ public void setupSchema(List columns) { metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION); List parquetTypes = new ArrayList<>(); int id = 1; + for (ColumnMetadata column : columns) { + /* Set up fields using top level column information */ validateColumnCollation(column); ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id); parquetTypes.add(typeInfo.getParquetType()); @@ -103,6 +109,10 @@ public void setupSchema(List columns) { if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { + /* + * tempStatsMap is used to store stats for the current batch, + * create a separate stats in case current batch has invalid rows which ruins the original stats. + */ this.tempStatsMap.put( column.getInternalName(), new RowBufferStats( @@ -115,30 +125,77 @@ public void setupSchema(List columns) { id++; } - schema = new MessageType(PARQUET_MESSAGE_TYPE_NAME, parquetTypes); + schema = new MessageType(clientBufferParameters.getParquetMessageTypeName(), parquetTypes); + + /* + * Iceberg mode requires stats for all primitive columns and sub-columns, set them up here. + * + * There are two values that are used to identify a column in the stats map: + * 1. ordinal - The ordinal is the index of the top level column in the schema. + * 2. fieldId - The fieldId is the id of all sub-columns in the schema. + * It's indexed by the level and order of the column in the schema. + * Note that the fieldId is set to 0 for non-structured columns. + * + * For example, consider the following schema: + * F1 INT, + * F2 STRUCT(F21 STRUCT(F211 INT), F22 INT), + * F3 INT, + * F4 MAP(INT, MAP(INT, INT)), + * F5 INT, + * F6 ARRAY(INT), + * F7 INT + * + * The ordinal and fieldId will look like this: + * F1: ordinal=1, fieldId=1 + * F2: ordinal=2, fieldId=2 + * F2.F21: ordinal=2, fieldId=8 + * F2.F21.F211: ordinal=2, fieldId=13 + * F2.F22: ordinal=2, fieldId=9 + * F3: ordinal=3, fieldId=3 + * F4: ordinal=4, fieldId=4 + * F4.key: ordinal=4, fieldId=10 + * F4.value: ordinal=4, fieldId=11 + * F4.value.key: ordinal=4, fieldId=14 + * F4.value.value: ordinal=4, fieldId=15 + * F5: ordinal=5, fieldId=5 + * F6: ordinal=6, fieldId=6 + * F6.element: ordinal=6, fieldId=12 + * F7: ordinal=7, fieldId=7 + * + * The stats map will contain the following entries: + * F1: ordinal=1, fieldId=0 + * F2: ordinal=2, fieldId=0 + * F2.F21.F211: ordinal=2, fieldId=13 + * F2.F22: ordinal=2, fieldId=9 + * F3: ordinal=3, fieldId=0 + * F4.key: ordinal=4, fieldId=10 + * F4.value.key: ordinal=4, fieldId=14 + * F4.value.value: ordinal=4, fieldId=15 + * F5: ordinal=5, fieldId=0 + * F6.element: ordinal=6, fieldId=12 + * F7: ordinal=7, fieldId=0 + */ if (clientBufferParameters.getIsIcebergMode()) { - /* Iceberg mode requires stats for sub-columns, set them up here. */ - for (ColumnDescriptor subColumnDescriptor : schema.getColumns()) { - String subColumnName = concatDotPath(subColumnDescriptor.getPath()); + for (ColumnDescriptor columnDescriptor : schema.getColumns()) { + String columnPath = concatDotPath(columnDescriptor.getPath()); /* set fieldId to 0 for non-structured columns */ int fieldId = - subColumnDescriptor.getPath().length == 1 + columnDescriptor.getPath().length == 1 ? 0 - : subColumnDescriptor.getPrimitiveType().getId().intValue(); - int ordinal = schema.getType(subColumnDescriptor.getPath()[0]).getId().intValue(); + : columnDescriptor.getPrimitiveType().getId().intValue(); + int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue(); this.statsMap.put( - subColumnName, - new RowBufferStats( - subColumnName, null /* collationDefinitionString */, ordinal, fieldId)); + columnPath, + new RowBufferStats(columnPath, null /* collationDefinitionString */, ordinal, fieldId)); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { this.tempStatsMap.put( - subColumnName, + columnPath, new RowBufferStats( - subColumnName, null /* collationDefinitionString */, ordinal, fieldId)); + columnPath, null /* collationDefinitionString */, ordinal, fieldId)); } } } diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index e01e3d491..95d941036 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -413,19 +413,20 @@ public static String getFullyQualifiedChannelName( } /* - * Get concat dot path, ignore null or empty string + * Get concat dot path, check if any path is empty or null * * @param path the path */ public static String concatDotPath(String... path) { StringBuilder sb = new StringBuilder(); for (String p : path) { - if (!isNullOrEmpty(p)) { - if (sb.length() > 0) { - sb.append("."); - } - sb.append(p); + if (isNullOrEmpty(p)) { + throw new IllegalArgumentException("Path cannot be null or empty"); } + if (sb.length() > 0) { + sb.append("."); + } + sb.append(p); } return sb.toString(); } diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index 3e6e9b6f4..c73269748 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -282,7 +282,8 @@ public void prepareForWrite(RecordConsumer recordConsumer) { @Override public void write(List values) { - List cols = schema.getFields(); + List cols = + schema.getFields(); /* getFields() returns top level columns in the schema */ if (values.size() != cols.size()) { throw new ParquetEncodingException( "Invalid input data in channel '" @@ -301,7 +302,7 @@ public void write(List values) { recordConsumer.endMessage(); } - private void writeValues(List values, GroupType type) { + private void writeValues(List values, GroupType type) { List cols = type.getFields(); for (int i = 0; i < cols.size(); ++i) { Object val = values.get(i); @@ -344,16 +345,28 @@ private void writeValues(List values, GroupType type) { } } else { if (cols.get(i).isRepetition(Type.Repetition.REPEATED)) { + /* List and Map */ for (Object o : values) { recordConsumer.startGroup(); if (o != null) { - writeValues((List) o, cols.get(i).asGroupType()); + if (o instanceof List) { + writeValues((List) o, cols.get(i).asGroupType()); + } else { + throw new ParquetEncodingException( + String.format("Field %s should be a 3 level list or map", fieldName)); + } } recordConsumer.endGroup(); } } else { + /* Struct */ recordConsumer.startGroup(); - writeValues((List) val, cols.get(i).asGroupType()); + if (val instanceof List) { + writeValues((List) val, cols.get(i).asGroupType()); + } else { + throw new ParquetEncodingException( + String.format("Field %s should be a 2 level struct", fieldName)); + } recordConsumer.endGroup(); } }