Skip to content

Commit

Permalink
Structured data type
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Sep 26, 2024
1 parent 7b3881b commit b4555cf
Show file tree
Hide file tree
Showing 15 changed files with 1,334 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,80 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
insertRowIndex);
}

/**
* Validate and cast Iceberg struct column to Map<String, Object>. Allowed Java type:
*
* <ul>
* <li>Map<String, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<String, Object> validateAndParseIcebergStruct(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName,
input.getClass(),
"STRUCT",
new String[] {"Map<String, Object>"},
insertRowIndex);
}
if (!((Map<?, ?>) input).keySet().stream().allMatch(key -> key instanceof String)) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Flied name of a struct must be of type String, rowIndex:%d", insertRowIndex));
}

return (Map<String, Object>) input;
}

/**
* Validate and parse Iceberg list column to an Iterable. Allowed Java type:
*
* <ul>
* <li>Iterable
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Iterable
*/
static Iterable<Object> validateAndParseIcebergList(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Iterable)) {
throw typeNotAllowedException(
columnName, input.getClass(), "LIST", new String[] {"Iterable"}, insertRowIndex);
}
return (Iterable<Object>) input;
}

/**
* Validate and parse Iceberg map column to a map. Allowed Java type:
*
* <ul>
* <li>Map<Object, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<Object, Object> validateAndParseIcebergMap(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName, input.getClass(), "MAP", new String[] {"Map"}, insertRowIndex);
}
return (Map<Object, Object>) input;
}

static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
BigDecimal comparand =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;
import static net.snowflake.ingest.utils.Constants.EP_NDV_UNKNOWN;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.DoubleStatistics;
import org.apache.parquet.column.statistics.FloatStatistics;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
private int columnOrdinal;
private int fieldId;
private String minStrValue;

private String maxStrValue;
Expand Down Expand Up @@ -84,6 +98,38 @@ class FileColumnProperties {
this.setDistinctValues(stats.getDistinctValues());
}

FileColumnProperties(int fieldId, Statistics<?> statistics) {
this.setColumnOrdinal(fieldId);
this.setFieldId(fieldId);
this.setNullCount(statistics.getNumNulls());
this.setDistinctValues(EP_NDV_UNKNOWN);
this.setCollation(null);
this.setMaxStrNonCollated(null);
this.setMinStrNonCollated(null);

if (statistics instanceof BooleanStatistics) {
this.setMinIntValue(
((BooleanStatistics) statistics).genericGetMin() ? BigInteger.ONE : BigInteger.ZERO);
this.setMaxIntValue(
((BooleanStatistics) statistics).genericGetMax() ? BigInteger.ONE : BigInteger.ZERO);
} else if (statistics instanceof IntStatistics || statistics instanceof LongStatistics) {
this.setMinIntValue(BigInteger.valueOf(((Number) statistics.genericGetMin()).longValue()));
this.setMaxIntValue(BigInteger.valueOf(((Number) statistics.genericGetMax()).longValue()));
} else if (statistics instanceof FloatStatistics || statistics instanceof DoubleStatistics) {
this.setMinRealValue((Double) statistics.genericGetMin());
this.setMaxRealValue((Double) statistics.genericGetMax());
} else if (statistics instanceof BinaryStatistics) {
if (statistics.type().getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
this.setMinIntValue(new BigInteger(statistics.getMinBytes()));
this.setMaxIntValue(new BigInteger(statistics.getMaxBytes()));
} else {
this.setMinStrValue(truncateBytesAsHex(statistics.getMinBytes(), false));
this.setMaxStrValue(truncateBytesAsHex(statistics.getMaxBytes(), true));
}
}
}

@JsonProperty("columnId")
public int getColumnOrdinal() {
return columnOrdinal;
Expand All @@ -93,6 +139,15 @@ public void setColumnOrdinal(int columnOrdinal) {
this.columnOrdinal = columnOrdinal;
}

@JsonProperty("fieldId")
public int getFieldId() {
return fieldId;
}

public void setFieldId(int fieldId) {
this.fieldId = fieldId;
}

// Annotation required in order to have package private fields serialized
@JsonProperty("minStrValue")
String getMinStrValue() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.utils.Pair;
import org.apache.parquet.hadoop.metadata.BlockMetaData;

/**
* Interface to convert {@link ChannelData} buffered in {@link RowBuffer} to the underlying format
Expand Down Expand Up @@ -39,20 +40,23 @@ class SerializationResult {
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;
final List<BlockMetaData> blocksMetadata;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
Pair<Long, Long> chunkMinMaxInsertTimeInMs,
List<BlockMetaData> blocksMetadata) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
this.blocksMetadata = blocksMetadata;
}
}
}
Loading

0 comments on commit b4555cf

Please sign in to comment.