Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merg] TMP #866

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -418,18 +418,31 @@ private static ParquetBufferValue getStructValue(
DataValidationUtil.validateAndParseIcebergStruct(path, value, insertRowsCurrIndex);
Set<String> extraFields = new HashSet<>(structVal.keySet());
List<Object> listVal = new ArrayList<>(type.getFieldCount());

float estimatedParquetSize = 0f;
for (int i = 0; i < type.getFieldCount(); i++) {
StringBuilder sb = new StringBuilder();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this stringbuilder business on the hotpath will make performance even worse.

Have two suggestions:

Option 1:

  • When parsing the schema, in IcebergDataTypeParser, set the struct's field's name to the field id.
  • When validating the input row, in IcebergParquetValueParser, (over here next to this comment) : (i) switch the logic to loop over the structVal.Keys collection instead of type.Fields, (ii) for each key, do a lookup to get the field id (you'll have to pipe through a lookup map and maintain it somewhere too), (iii) use this fieldId as the "field name" when doing type.getType. Note that type.getType has an overload that takes in field names.

Option 2:

  • When parsing the schema, in IcebergDataTypeParser, set the struct's field name to getEscapedString(fieldName) and let AvroSchemaUtil do its thing on whatever we pass in.
  • Whan validating the input row, in IcebergParquetValueParser, (i) switch logic to loop over structVal.Keys instead of type.Fields, (ii) for each key, call type.getFieldName(getEscapedString(key))
  • getEscapedString(String str) should be implemented as: return str.replace("_", "_x" + Integer.toHexString("_").toUpperCase())
    This combined with AvroSchemaUtil's internal behavior to escape every non-digit, non-letter, non-underscore character, will make sure we are able to generate safe non-colliding names. tldr with this fix we're making sure everything except digits and numbers is getting escaped properly, and effectively fixing AvroSchemaUtil's bug
    In both cases, switching what we iterate on will need to be accompanied by fixes in how extraFields collection's bookkeeping is done.

I prefer option 1 because it completely sidesteps AvroSchemaUtil's buggy behavior. But i'm not sure if there is an easy way to maintain this per-struct-type map somewhere and handle all nested struct situations where there's a series of structs embedded inside each other. Still putting it out here in case you can come up with something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will try option 1

String fieldName = type.getFieldName(i);
for (int j = 0; j < fieldName.length(); j++) {
if (fieldName.charAt(j) == '_') {
sb.append((char) Integer.parseInt(fieldName.substring(j + 2, j + 4), 16));
j += 3;
} else {
sb.append(fieldName.charAt(j));
}
}
String originalFieldName = sb.substring(0, sb.toString().lastIndexOf('_'));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this for extra field validation.


ParquetBufferValue parsedValue =
parseColumnValueToParquet(
structVal.getOrDefault(type.getFieldName(i), null),
structVal.getOrDefault(originalFieldName, null),
type.getType(i),
statsMap,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
extraFields.remove(type.getFieldName(i));
extraFields.remove(originalFieldName);
listVal.add(parsedValue.getValue());
estimatedParquetSize += parsedValue.getSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
public void setupSchema(List<ColumnMetadata> columns) {
fieldIndex.clear();
metadata.clear();
metadata.put("sfVer", "1,1");
if (!clientBufferParameters.getIsIcebergMode()) {
metadata.put("sfVer", "1,1");
}
List<Type> parquetTypes = new ArrayList<>();
int id = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) {
field.isObject(), "Cannot parse struct field from non-object: %s", field);

int id = JsonUtil.getInt(ID, field);
String name = JsonUtil.getString(NAME, field);
String name = (JsonUtil.getString(NAME, field) + "_" + id).replace("_", "_x5F");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encode all "_" to its hex, same idea as escape.

Type type = getTypeFromJson(field.get(TYPE));

String doc = JsonUtil.getStringOrNull(DOC, field);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.datatypes;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.RandomStringUtils;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
Expand All @@ -26,6 +34,8 @@ public static Object[][] parameters() {
@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

static final Random generator = new Random(0x5EED);

@Before
public void before() throws Exception {
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
Expand Down Expand Up @@ -306,6 +316,7 @@ public void testDecimal() throws Exception {
testIcebergIngestion("decimal(3, 1)", 12.5f, new FloatProvider());
testIcebergIngestion("decimal(3, 1)", -99, new IntProvider());
testIcebergIngestion("decimal(38, 0)", Long.MAX_VALUE, new LongProvider());
testIcebergIngestion("decimal(21, 0)", .0, new DoubleProvider());
testIcebergIngestion("decimal(38, 10)", null, new BigDecimalProvider());

testIcebergIngestion(
Expand Down Expand Up @@ -368,5 +379,48 @@ public void testDecimalAndQueries() throws Exception {
Arrays.asList(new BigDecimal("-12.3"), new BigDecimal("-12.3"), null),
"select COUNT({columnName}) from {tableName} where {columnName} = -12.3",
Arrays.asList(2L));

List<Object> bigDecimals_9_4 = randomBigDecimal(200, 9, 4);
testIcebergIngestAndQuery(
"decimal(9, 4)", bigDecimals_9_4, "select {columnName} from {tableName}", bigDecimals_9_4);

List<Object> bigDecimals_18_9 = randomBigDecimal(200, 18, 9);
testIcebergIngestAndQuery(
"decimal(18, 9)",
bigDecimals_18_9,
"select {columnName} from {tableName}",
bigDecimals_18_9);

List<Object> bigDecimals_21_0 = randomBigDecimal(200, 21, 0);
testIcebergIngestAndQuery(
"decimal(21, 0)",
bigDecimals_21_0,
"select {columnName} from {tableName}",
bigDecimals_21_0);

List<Object> bigDecimals_38_10 = randomBigDecimal(200, 38, 10);
testIcebergIngestAndQuery(
"decimal(38, 10)",
bigDecimals_38_10,
"select {columnName} from {tableName}",
bigDecimals_38_10);
}

private static List<Object> randomBigDecimal(int count, int precision, int scale) {
List<Object> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
int intPart = generator.nextInt(precision - scale + 1);
int floatPart = generator.nextInt(scale + 1);
if (intPart == 0 && floatPart == 0) {
list.add(null);
continue;
}
list.add(
new BigDecimal(
RandomStringUtils.randomNumeric(intPart)
+ "."
+ RandomStringUtils.randomNumeric(floatPart)));
}
return list;
}
}
Loading