Skip to content

Commit

Permalink
Support structured data types
Browse files Browse the repository at this point in the history
EP info

Add log

Fix IT

Add tests for nested datatype

fix ep tmp
  • Loading branch information
sfc-gh-alhuang authored and sfc-gh-hmadan committed Sep 19, 2024
1 parent 7eb7ba5 commit 7380c36
Show file tree
Hide file tree
Showing 21 changed files with 1,390 additions and 350 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -25,6 +25,9 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;

/**
* The abstract implementation of the buffer in the Streaming Ingest channel that holds the
Expand Down Expand Up @@ -655,6 +658,51 @@ static EpInfo buildEpInfoFromStats(long rowCount, Map<String, RowBufferStats> co
return epInfo;
}

static EpInfo buildEPInfoFromParquetWriterResults(
List<BlockMetaData> blocksMetadata,
Map<String, Integer> ndvStats,
Map<String, Integer> maxLengthStats) {
if (blocksMetadata.isEmpty()) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "No blocks metadata found");
}
EpInfo epInfo =
new EpInfo(
blocksMetadata.stream().mapToLong(BlockMetaData::getRowCount).sum(), new HashMap<>());

Map<String, Statistics<?>> mergedStatistics = new HashMap<>();
for (BlockMetaData blockMetaData : blocksMetadata) {
for (ColumnChunkMetaData columnChunkMetaData : blockMetaData.getColumns()) {
String subColumnName = columnChunkMetaData.getPath().toDotString();
if (mergedStatistics.get(subColumnName) == null) {
mergedStatistics.put(subColumnName, columnChunkMetaData.getStatistics());
} else {
mergedStatistics.get(subColumnName).mergeStatistics(columnChunkMetaData.getStatistics());
}
}
}

String prevColumnName = "";
int columnOrdinal = 0;
for (ColumnChunkMetaData columnChunkMetaData : blocksMetadata.get(0).getColumns()) {
if (!prevColumnName.equals(columnChunkMetaData.getPath().toArray()[0])) {
columnOrdinal++;
prevColumnName = columnChunkMetaData.getPath().toArray()[0];
}
String subColumnName = columnChunkMetaData.getPath().toDotString();
FileColumnProperties dto =
new FileColumnProperties(
columnOrdinal,
columnChunkMetaData.getPrimitiveType().getId().intValue(),
mergedStatistics.get(subColumnName),
ndvStats.getOrDefault(subColumnName, 0),
maxLengthStats.getOrDefault(subColumnName, 0));
epInfo.getColumnEps().put(subColumnName, dto);
}

epInfo.verifyEpInfo();
return epInfo;
}

/** Row buffer factory. */
static <T> AbstractRowBuffer<T> createRowBuffer(
OpenChannelRequest.OnErrorOption onErrorOption,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ class BlobBuilder {
* belongs to the same table. Will error if this is not the case
* @param bdecVersion version of blob
* @param encrypt If the output chunk is encrypted or not
* @param isIceberg If the streaming client is for Iceberg table or not
* @return {@link Blob} data
*/
static <T> Blob constructBlobAndMetadata(
String filePath,
List<List<ChannelData<T>>> blobData,
Constants.BdecVersion bdecVersion,
boolean encrypt)
boolean encrypt,
boolean isIceberg)
throws IOException, NoSuchPaddingException, NoSuchAlgorithmException,
InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException,
BadPaddingException {
Expand Down Expand Up @@ -132,8 +134,13 @@ static <T> Blob constructBlobAndMetadata(
.setChunkMD5(md5)
.setEncryptionKeyId(firstChannelFlushContext.getEncryptionKeyId())
.setEpInfo(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined))
isIceberg
? AbstractRowBuffer.buildEPInfoFromParquetWriterResults(
serializedChunk.blocksMetadata,
serializedChunk.ndvStats,
serializedChunk.maxLengthStats)
: AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount, serializedChunk.columnEpStatsMapCombined))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -200,4 +200,14 @@ Long getFirstInsertTimeInMs() {
Long getLastInsertTimeInMs() {
return this.lastInsertTimeInMs;
}

// @JsonProperty("major_vers")
// Integer getMajorVersion() {
// return PARQUET_MAJOR_VERSION;
// }
//
// @JsonProperty("minor_vers")
// Integer getMinorVersion() {
// return PARQUET_MINOR_VERSION;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,80 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
insertRowIndex);
}

/**
* Validate and cast Iceberg struct column to Map<String, Object>. Allowed Java type:
*
* <ul>
* <li>Map<String, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<String, Object> validateAndParseIcebergStruct(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName,
input.getClass(),
"STRUCT",
new String[] {"Map<String, Object>"},
insertRowIndex);
}
if (!((Map<?, ?>) input).keySet().stream().allMatch(key -> key instanceof String)) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Flied name of a struct must be of type String, rowIndex:%d", insertRowIndex));
}

return (Map<String, Object>) input;
}

/**
* Validate and parse Iceberg list column to an Iterable. Allowed Java type:
*
* <ul>
* <li>Iterable
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Iterable
*/
static Iterable<Object> validateAndParseIcebergList(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Iterable)) {
throw typeNotAllowedException(
columnName, input.getClass(), "LIST", new String[] {"Iterable"}, insertRowIndex);
}
return (Iterable<Object>) input;
}

/**
* Validate and parse Iceberg map column to a map. Allowed Java type:
*
* <ul>
* <li>Map<Object, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<Object, Object> validateAndParseIcebergMap(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName, input.getClass(), "MAP", new String[] {"Map"}, insertRowIndex);
}
return (Map<Object, Object>) input;
}

static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
BigDecimal comparand =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.DoubleStatistics;
import org.apache.parquet.column.statistics.FloatStatistics;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
private int columnOrdinal;
private int fieldId;
private String minStrValue;

private String maxStrValue;
Expand Down Expand Up @@ -84,6 +98,40 @@ class FileColumnProperties {
this.setDistinctValues(stats.getDistinctValues());
}

FileColumnProperties(
int columnOrdinal, int fieldId, Statistics<?> statistics, int ndv, int maxLength) {
this.setColumnOrdinal(columnOrdinal);
this.setFieldId(fieldId);
this.setNullCount(statistics.getNumNulls());
this.setDistinctValues(ndv);
this.setCollation(null);
this.setMaxStrNonCollated(null);
this.setMinStrNonCollated(null);

if (statistics instanceof BooleanStatistics) {
this.setMinIntValue(
((BooleanStatistics) statistics).genericGetMin() ? BigInteger.ONE : BigInteger.ZERO);
this.setMaxIntValue(
((BooleanStatistics) statistics).genericGetMax() ? BigInteger.ONE : BigInteger.ZERO);
} else if (statistics instanceof IntStatistics || statistics instanceof LongStatistics) {
this.setMinIntValue(BigInteger.valueOf(((Number) statistics.genericGetMin()).longValue()));
this.setMaxIntValue(BigInteger.valueOf(((Number) statistics.genericGetMax()).longValue()));
} else if (statistics instanceof FloatStatistics || statistics instanceof DoubleStatistics) {
this.setMinRealValue((Double) statistics.genericGetMin());
this.setMaxRealValue((Double) statistics.genericGetMax());
} else if (statistics instanceof BinaryStatistics) {
if (statistics.type().getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
this.setMinIntValue(new BigInteger(statistics.getMinBytes()));
this.setMaxIntValue(new BigInteger(statistics.getMaxBytes()));
} else {
this.setMinStrValue(truncateBytesAsHex(statistics.getMinBytes(), false));
this.setMaxStrValue(truncateBytesAsHex(statistics.getMaxBytes(), true));
this.setMaxLength(maxLength);
}
}
}

@JsonProperty("columnId")
public int getColumnOrdinal() {
return columnOrdinal;
Expand All @@ -93,6 +141,16 @@ public void setColumnOrdinal(int columnOrdinal) {
this.columnOrdinal = columnOrdinal;
}

@JsonProperty("fieldId")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
int getFieldId() {
return fieldId;
}

void setFieldId(int fieldId) {
this.fieldId = fieldId;
}

// Annotation required in order to have package private fields serialized
@JsonProperty("minStrValue")
String getMinStrValue() {
Expand Down Expand Up @@ -206,6 +264,7 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"columnOrdinal\": ").append(columnOrdinal);
sb.append(", \"fieldId\": ").append(fieldId);
if (minIntValue != null) {
sb.append(", \"minIntValue\": ").append(minIntValue);
sb.append(", \"maxIntValue\": ").append(maxIntValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ BlobMetadata buildAndUpload(
blobPath.fileName,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider().getEnableChunkEncryption());
this.owningClient.getInternalParameterProvider().getEnableChunkEncryption(),
this.owningClient.getInternalParameterProvider().getIsIcebergMode());

blob.blobStats.setBuildDurationMs(buildContext);

Expand Down
14 changes: 12 additions & 2 deletions src/main/java/net/snowflake/ingest/streaming/internal/Flusher.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.utils.Pair;
import org.apache.parquet.hadoop.metadata.BlockMetaData;

/**
* Interface to convert {@link ChannelData} buffered in {@link RowBuffer} to the underlying format
Expand Down Expand Up @@ -39,20 +40,29 @@ class SerializationResult {
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;
final List<BlockMetaData> blocksMetadata;
final Map<String, Integer> ndvStats;
final Map<String, Integer> maxLengthStats;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
Pair<Long, Long> chunkMinMaxInsertTimeInMs,
List<BlockMetaData> blocksMetadata,
Map<String, Integer> ndvStats,
Map<String, Integer> maxLengthStats) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
this.blocksMetadata = blocksMetadata;
this.ndvStats = ndvStats;
this.maxLengthStats = maxLengthStats;
}
}
}
Loading

0 comments on commit 7380c36

Please sign in to comment.