Skip to content

Commit

Permalink
Structured data type
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Sep 24, 2024
1 parent 0919151 commit d269eed
Show file tree
Hide file tree
Showing 16 changed files with 1,335 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,80 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
insertRowIndex);
}

/**
* Validate and cast Iceberg struct column to Map<String, Object>. Allowed Java type:
*
* <ul>
* <li>Map<String, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<String, Object> validateAndParseIcebergStruct(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName,
input.getClass(),
"STRUCT",
new String[] {"Map<String, Object>"},
insertRowIndex);
}
if (!((Map<?, ?>) input).keySet().stream().allMatch(key -> key instanceof String)) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Flied name of a struct must be of type String, rowIndex:%d", insertRowIndex));
}

return (Map<String, Object>) input;
}

/**
* Validate and parse Iceberg list column to an Iterable. Allowed Java type:
*
* <ul>
* <li>Iterable
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Iterable
*/
static Iterable<Object> validateAndParseIcebergList(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Iterable)) {
throw typeNotAllowedException(
columnName, input.getClass(), "LIST", new String[] {"Iterable"}, insertRowIndex);
}
return (Iterable<Object>) input;
}

/**
* Validate and parse Iceberg map column to a map. Allowed Java type:
*
* <ul>
* <li>Map<Object, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<Object, Object> validateAndParseIcebergMap(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName, input.getClass(), "MAP", new String[] {"Map"}, insertRowIndex);
}
return (Map<Object, Object>) input;
}

static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
BigDecimal comparand =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;
import static net.snowflake.ingest.utils.Constants.EP_NDV_UNKNOWN;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.DoubleStatistics;
import org.apache.parquet.column.statistics.FloatStatistics;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
private int columnOrdinal;
private int fieldId;
private String minStrValue;

private String maxStrValue;
Expand Down Expand Up @@ -84,6 +98,38 @@ class FileColumnProperties {
this.setDistinctValues(stats.getDistinctValues());
}

FileColumnProperties(int fieldId, Statistics<?> statistics) {
this.setColumnOrdinal(fieldId);
this.setFieldId(fieldId);
this.setNullCount(statistics.getNumNulls());
this.setDistinctValues(EP_NDV_UNKNOWN);
this.setCollation(null);
this.setMaxStrNonCollated(null);
this.setMinStrNonCollated(null);

if (statistics instanceof BooleanStatistics) {
this.setMinIntValue(
((BooleanStatistics) statistics).genericGetMin() ? BigInteger.ONE : BigInteger.ZERO);
this.setMaxIntValue(
((BooleanStatistics) statistics).genericGetMax() ? BigInteger.ONE : BigInteger.ZERO);
} else if (statistics instanceof IntStatistics || statistics instanceof LongStatistics) {
this.setMinIntValue(BigInteger.valueOf(((Number) statistics.genericGetMin()).longValue()));
this.setMaxIntValue(BigInteger.valueOf(((Number) statistics.genericGetMax()).longValue()));
} else if (statistics instanceof FloatStatistics || statistics instanceof DoubleStatistics) {
this.setMinRealValue((Double) statistics.genericGetMin());
this.setMaxRealValue((Double) statistics.genericGetMax());
} else if (statistics instanceof BinaryStatistics) {
if (statistics.type().getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
this.setMinIntValue(new BigInteger(statistics.getMinBytes()));
this.setMaxIntValue(new BigInteger(statistics.getMaxBytes()));
} else {
this.setMinStrValue(truncateBytesAsHex(statistics.getMinBytes(), false));
this.setMaxStrValue(truncateBytesAsHex(statistics.getMaxBytes(), true));
}
}
}

@JsonProperty("columnId")
public int getColumnOrdinal() {
return columnOrdinal;
Expand All @@ -93,6 +139,15 @@ public void setColumnOrdinal(int columnOrdinal) {
this.columnOrdinal = columnOrdinal;
}

@JsonProperty("fieldId")
public int getFieldId() {
return fieldId;
}

public void setFieldId(int fieldId) {
this.fieldId = fieldId;
}

// Annotation required in order to have package private fields serialized
@JsonProperty("minStrValue")
String getMinStrValue() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.utils.Pair;
import org.apache.parquet.hadoop.metadata.BlockMetaData;

/**
* Interface to convert {@link ChannelData} buffered in {@link RowBuffer} to the underlying format
Expand Down Expand Up @@ -39,20 +40,23 @@ class SerializationResult {
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;
final List<BlockMetaData> blocksMetadata;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
Pair<Long, Long> chunkMinMaxInsertTimeInMs,
List<BlockMetaData> blocksMetadata) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
this.blocksMetadata = blocksMetadata;
}
}
}
Loading

0 comments on commit d269eed

Please sign in to comment.