Skip to content

Commit

Permalink
SNOW-1666189 Structured data type support (#841)
Browse files Browse the repository at this point in the history
Co-authored-by: Hitesh Madan <[email protected]>
  • Loading branch information
sfc-gh-alhuang and sfc-gh-hmadan authored Sep 30, 2024
1 parent 27cd3a7 commit d283df1
Show file tree
Hide file tree
Showing 18 changed files with 1,499 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
private static final String BDEC_PARQUET_MESSAGE_TYPE_NAME = "bdec";
private static final String PARQUET_MESSAGE_TYPE_NAME = "schema";

private long maxChunkSizeInBytes;

Expand Down Expand Up @@ -118,4 +120,8 @@ public boolean getIsIcebergMode() {
public Optional<Integer> getMaxRowGroups() {
return maxRowGroups;
}

public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,83 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
insertRowIndex);
}

/**
* Validate and cast Iceberg struct column to Map<String, Object>. Allowed Java type:
*
* <ul>
* <li>Map<String, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<String, ?> validateAndParseIcebergStruct(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName,
input.getClass(),
"STRUCT",
new String[] {"Map<String, Object>"},
insertRowIndex);
}
for (Object key : ((Map<?, ?>) input).keySet()) {
if (!(key instanceof String)) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Field name of struct %s must be of type String, rowIndex:%d",
columnName, insertRowIndex));
}
}

return (Map<String, ?>) input;
}

/**
* Validate and parse Iceberg list column to an Iterable. Allowed Java type:
*
* <ul>
* <li>Iterable
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Iterable
*/
static Iterable<?> validateAndParseIcebergList(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Iterable)) {
throw typeNotAllowedException(
columnName, input.getClass(), "LIST", new String[] {"Iterable"}, insertRowIndex);
}
return (Iterable<?>) input;
}

/**
* Validate and parse Iceberg map column to a map. Allowed Java type:
*
* <ul>
* <li>Map<Object, Object>
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Object cast to Map
*/
static Map<?, ?> validateAndParseIcebergMap(
String columnName, Object input, long insertRowIndex) {
if (!(input instanceof Map)) {
throw typeNotAllowedException(
columnName, input.getClass(), "MAP", new String[] {"Map"}, insertRowIndex);
}
return (Map<?, ?>) input;
}

static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
BigDecimal comparand =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigInteger;
import java.util.Objects;

/** Audit register endpoint/FileColumnPropertyDTO property list. */
class FileColumnProperties {
private int columnOrdinal;
private Integer fieldId;
private String minStrValue;

private String maxStrValue;
Expand Down Expand Up @@ -46,6 +52,7 @@ class FileColumnProperties {

FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) {
this.setColumnOrdinal(stats.getOrdinal());
this.setFieldId(stats.getFieldId());
this.setCollation(stats.getCollationDefinitionString());
this.setMaxIntValue(
stats.getCurrentMaxIntValue() == null
Expand Down Expand Up @@ -93,6 +100,16 @@ public void setColumnOrdinal(int columnOrdinal) {
this.columnOrdinal = columnOrdinal;
}

@JsonProperty("fieldId")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getFieldId() {
return fieldId;
}

public void setFieldId(Integer fieldId) {
this.fieldId = fieldId;
}

// Annotation required in order to have package private fields serialized
@JsonProperty("minStrValue")
String getMinStrValue() {
Expand Down Expand Up @@ -206,6 +223,7 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"columnOrdinal\": ").append(columnOrdinal);
sb.append(", \"fieldId\": ").append(fieldId);
if (minIntValue != null) {
sb.append(", \"minIntValue\": ").append(minIntValue);
sb.append(", \"maxIntValue\": ").append(maxIntValue);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down
Loading

0 comments on commit d283df1

Please sign in to comment.