Skip to content

Commit

Permalink
Address comments & add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Sep 26, 2024
1 parent a1ab107 commit 02e4a70
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
private static final String BDEC_PARQUET_MESSAGE_TYPE_NAME = "bdec";
private static final String PARQUET_MESSAGE_TYPE_NAME = "schema";

private long maxChunkSizeInBytes;

Expand Down Expand Up @@ -118,4 +120,8 @@ public boolean getIsIcebergMode() {
public Optional<Integer> getMaxRowGroups() {
return maxRowGroups;
}

public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray;
import static net.snowflake.ingest.utils.Utils.concatDotPath;
import static net.snowflake.ingest.utils.Utils.isNullOrEmpty;

import java.math.BigDecimal;
import java.math.BigInteger;
Expand Down Expand Up @@ -72,7 +73,7 @@ private static ParquetBufferValue parseColumnValueToParquet(
long insertRowsCurrIndex,
String path,
boolean isDescendantsOfRepeatingGroup) {
path = concatDotPath(path, type.getName());
path = isNullOrEmpty(path) ? type.getName() : concatDotPath(path, type.getName());
float estimatedParquetSize = 0F;

if (type.isPrimitive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
* converted to Parquet format for faster processing
*/
public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
private static final String PARQUET_MESSAGE_TYPE_NAME = "bdec";

private final Map<String, ParquetColumn> fieldIndex;

Expand Down Expand Up @@ -74,6 +73,11 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
this.tempData = new ArrayList<>();
}

/**
* Set up the parquet schema.
*
* @param columns top level columns list of column metadata
*/
@Override
public void setupSchema(List<ColumnMetadata> columns) {
fieldIndex.clear();
Expand All @@ -82,7 +86,9 @@ public void setupSchema(List<ColumnMetadata> columns) {
metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION);
List<Type> parquetTypes = new ArrayList<>();
int id = 1;

for (ColumnMetadata column : columns) {
/* Set up fields using top level column information */
validateColumnCollation(column);
ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id);
parquetTypes.add(typeInfo.getParquetType());
Expand All @@ -103,6 +109,10 @@ public void setupSchema(List<ColumnMetadata> columns) {

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
/*
* tempStatsMap is used to store stats for the current batch,
* create a separate stats in case current batch has invalid rows which ruins the original stats.
*/
this.tempStatsMap.put(
column.getInternalName(),
new RowBufferStats(
Expand All @@ -115,30 +125,77 @@ public void setupSchema(List<ColumnMetadata> columns) {

id++;
}
schema = new MessageType(PARQUET_MESSAGE_TYPE_NAME, parquetTypes);
schema = new MessageType(clientBufferParameters.getParquetMessageTypeName(), parquetTypes);

/*
* Iceberg mode requires stats for all primitive columns and sub-columns, set them up here.
*
* There are two values that are used to identify a column in the stats map:
* 1. ordinal - The ordinal is the index of the top level column in the schema.
* 2. fieldId - The fieldId is the id of all sub-columns in the schema.
* It's indexed by the level and order of the column in the schema.
* Note that the fieldId is set to 0 for non-structured columns.
*
* For example, consider the following schema:
* F1 INT,
* F2 STRUCT(F21 STRUCT(F211 INT), F22 INT),
* F3 INT,
* F4 MAP(INT, MAP(INT, INT)),
* F5 INT,
* F6 ARRAY(INT),
* F7 INT
*
* The ordinal and fieldId will look like this:
* F1: ordinal=1, fieldId=1
* F2: ordinal=2, fieldId=2
* F2.F21: ordinal=2, fieldId=8
* F2.F21.F211: ordinal=2, fieldId=13
* F2.F22: ordinal=2, fieldId=9
* F3: ordinal=3, fieldId=3
* F4: ordinal=4, fieldId=4
* F4.key: ordinal=4, fieldId=10
* F4.value: ordinal=4, fieldId=11
* F4.value.key: ordinal=4, fieldId=14
* F4.value.value: ordinal=4, fieldId=15
* F5: ordinal=5, fieldId=5
* F6: ordinal=6, fieldId=6
* F6.element: ordinal=6, fieldId=12
* F7: ordinal=7, fieldId=7
*
* The stats map will contain the following entries:
* F1: ordinal=1, fieldId=0
* F2: ordinal=2, fieldId=0
* F2.F21.F211: ordinal=2, fieldId=13
* F2.F22: ordinal=2, fieldId=9
* F3: ordinal=3, fieldId=0
* F4.key: ordinal=4, fieldId=10
* F4.value.key: ordinal=4, fieldId=14
* F4.value.value: ordinal=4, fieldId=15
* F5: ordinal=5, fieldId=0
* F6.element: ordinal=6, fieldId=12
* F7: ordinal=7, fieldId=0
*/
if (clientBufferParameters.getIsIcebergMode()) {
/* Iceberg mode requires stats for sub-columns, set them up here. */
for (ColumnDescriptor subColumnDescriptor : schema.getColumns()) {
String subColumnName = concatDotPath(subColumnDescriptor.getPath());
for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
String columnPath = concatDotPath(columnDescriptor.getPath());

/* set fieldId to 0 for non-structured columns */
int fieldId =
subColumnDescriptor.getPath().length == 1
columnDescriptor.getPath().length == 1
? 0
: subColumnDescriptor.getPrimitiveType().getId().intValue();
int ordinal = schema.getType(subColumnDescriptor.getPath()[0]).getId().intValue();
: columnDescriptor.getPrimitiveType().getId().intValue();
int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue();

this.statsMap.put(
subColumnName,
new RowBufferStats(
subColumnName, null /* collationDefinitionString */, ordinal, fieldId));
columnPath,
new RowBufferStats(columnPath, null /* collationDefinitionString */, ordinal, fieldId));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
subColumnName,
columnPath,
new RowBufferStats(
subColumnName, null /* collationDefinitionString */, ordinal, fieldId));
columnPath, null /* collationDefinitionString */, ordinal, fieldId));
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/net/snowflake/ingest/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,19 +413,20 @@ public static String getFullyQualifiedChannelName(
}

/*
* Get concat dot path, ignore null or empty string
* Get concat dot path, check if any path is empty or null
*
* @param path the path
*/
public static String concatDotPath(String... path) {
StringBuilder sb = new StringBuilder();
for (String p : path) {
if (!isNullOrEmpty(p)) {
if (sb.length() > 0) {
sb.append(".");
}
sb.append(p);
if (isNullOrEmpty(p)) {
throw new IllegalArgumentException("Path cannot be null or empty");
}
if (sb.length() > 0) {
sb.append(".");
}
sb.append(p);
}
return sb.toString();
}
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ public void prepareForWrite(RecordConsumer recordConsumer) {

@Override
public void write(List<Object> values) {
List<Type> cols = schema.getFields();
List<Type> cols =
schema.getFields(); /* getFields() returns top level columns in the schema */
if (values.size() != cols.size()) {
throw new ParquetEncodingException(
"Invalid input data in channel '"
Expand All @@ -301,7 +302,7 @@ public void write(List<Object> values) {
recordConsumer.endMessage();
}

private void writeValues(List<Object> values, GroupType type) {
private void writeValues(List<?> values, GroupType type) {
List<Type> cols = type.getFields();
for (int i = 0; i < cols.size(); ++i) {
Object val = values.get(i);
Expand Down Expand Up @@ -344,16 +345,28 @@ private void writeValues(List<Object> values, GroupType type) {
}
} else {
if (cols.get(i).isRepetition(Type.Repetition.REPEATED)) {
/* List and Map */
for (Object o : values) {
recordConsumer.startGroup();
if (o != null) {
writeValues((List<Object>) o, cols.get(i).asGroupType());
if (o instanceof List) {
writeValues((List<?>) o, cols.get(i).asGroupType());
} else {
throw new ParquetEncodingException(
String.format("Field %s should be a 3 level list or map", fieldName));
}
}
recordConsumer.endGroup();
}
} else {
/* Struct */
recordConsumer.startGroup();
writeValues((List<Object>) val, cols.get(i).asGroupType());
if (val instanceof List) {
writeValues((List<?>) val, cols.get(i).asGroupType());
} else {
throw new ParquetEncodingException(
String.format("Field %s should be a 2 level struct", fieldName));
}
recordConsumer.endGroup();
}
}
Expand Down

0 comments on commit 02e4a70

Please sign in to comment.