Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Sep 5, 2024
1 parent eab2ab2 commit ff8f766
Show file tree
Hide file tree
Showing 16 changed files with 473 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2023-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -20,25 +20,30 @@ public class ClientBufferParameters {

private Constants.BdecParquetCompression bdecParquetCompression;

private boolean isIcebergMode;

/**
* Private constructor used for test methods
*
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param isIcebergMode
*/
private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
boolean enableNewJsonParsingLogic,
boolean isIcebergMode) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.isIcebergMode = isIcebergMode;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -59,32 +64,38 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;

this.enableNewJsonParsingLogic =
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
this.isIcebergMode =
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
}

/**
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param isIcebergMode
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
boolean enableNewJsonParsingLogic,
boolean isIcebergMode) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic);
enableNewJsonParsingLogic,
isIcebergMode);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -106,4 +117,8 @@ public Constants.BdecParquetCompression getBdecParquetCompression() {
public boolean isEnableNewJsonParsingLogic() {
return enableNewJsonParsingLogic;
}

public boolean getIsIcebergMode() {
return isIcebergMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ class ColumnMetadata {
private Integer length;
private boolean nullable;
private String collation;

/**
* The Json serialization of Iceberg data type of the column, see <a
* href="https://iceberg.apache.org/spec/#appendix-c-json-serialization">JSON serialization</a>
* for more details.
*/
private String sourceIcebergDataType;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,17 @@ static double validateAndParseReal(String columnName, Object input, long insertR
*/
static int validateAndParseIcebergInt(String columnName, Object input, long insertRowIndex) {
if (input instanceof Number) {
return ((Number) input).intValue();
double value = ((Number) input).doubleValue();
long longValue = Math.round(value);
if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE || Double.isNaN(value)) {
throw new SFException(
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Number out of representable inclusive range of integers between %d and %d,"
+ " rowIndex:%d",
Integer.MIN_VALUE, Integer.MAX_VALUE, insertRowIndex));
}
return (int) longValue;
} else if (input instanceof String) {
try {
return Integer.parseInt(((String) input).trim());
Expand Down Expand Up @@ -1000,7 +1010,17 @@ static int validateAndParseIcebergInt(String columnName, Object input, long inse
*/
static long validateAndParseIcebergLong(String columnName, Object input, long insertRowIndex) {
if (input instanceof Number) {
return ((Number) input).longValue();
double value = ((Number) input).doubleValue();
double roundedDouble = (value > 0) ? Math.floor(value + 0.5) : Math.ceil(value - 0.5);
if (roundedDouble > Long.MAX_VALUE || roundedDouble < Long.MIN_VALUE || Double.isNaN(value)) {
throw new SFException(
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Number out of representable inclusive range of longs between %d and %d,"
+ " rowIndex:%d",
Long.MIN_VALUE, Long.MAX_VALUE, insertRowIndex));
}
return (long) roundedDouble;
} else if (input instanceof String) {
try {
return Long.parseLong(((String) input).trim());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ String getOffsetToken() {
public String getStringForLogging() {
return String.format(
"OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ " writeMode=%s)",
requestId, role, database, schema, table, channel, writeMode);
+ " writeMode=%s, isIceberg=%s)",
requestId, role, database, schema, table, channel, writeMode, isIceberg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import org.apache.parquet.schema.Type;

/** Represents a column in a Parquet file. */
class ParquetColumn {
final ColumnMetadata columnMetadata;
final int index;
final Type type;

ParquetColumn(ColumnMetadata columnMetadata, int index, Type type) {
this.columnMetadata = columnMetadata;
this.index = index;
this.type = type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
Expand Down Expand Up @@ -204,16 +205,17 @@ private float addRow(
forkedStatsMap.put(columnName, forkedStats);
ColumnMetadata column = parquetColumn.columnMetadata;
ParquetBufferValue valueWithSize =
(column.getSourceIcebergDataType() == null
? SnowflakeParquetValueParser.parseColumnValueToParquet(
(clientBufferParameters.getIsIcebergMode()
? IcebergParquetValueParser.parseColumnValueToParquet(
value, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex)
: SnowflakeParquetValueParser.parseColumnValueToParquet(
value,
column,
parquetColumn.type.asPrimitiveType().getPrimitiveTypeName(),
forkedStats,
defaultTimezone,
insertRowsCurrIndex)
: IcebergParquetValueParser.parseColumnValueToParquet(
value, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex));
insertRowsCurrIndex,
clientBufferParameters.isEnableNewJsonParsingLogic()));
indexedRow[colIndex] = valueWithSize.getValue();
size += valueWithSize.getSize();
}
Expand Down Expand Up @@ -304,6 +306,10 @@ Object getVectorValueAt(String column, int index) {
if (logicalType == ColumnLogicalType.BINARY && value != null) {
value = value instanceof String ? ((String) value).getBytes(StandardCharsets.UTF_8) : value;
}
/* Mismatch between Iceberg string & FDN String */
if (Objects.equals(columnMetadata.getSourceIcebergDataType(), "\"string\"")) {
value = value instanceof byte[] ? new String((byte[]) value, StandardCharsets.UTF_8) : value;
}
return value;
}

Expand Down Expand Up @@ -339,16 +345,4 @@ public Flusher<ParquetChunkData> createFlusher() {
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
}

private static class ParquetColumn {
final ColumnMetadata columnMetadata;
final int index;
final Type type;

private ParquetColumn(ColumnMetadata columnMetadata, int index, Type type) {
this.columnMetadata = columnMetadata;
this.index = index;
this.type = type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.IcebergDataTypeParser.deserializeIcebergType;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.IcebergDataTypeParser;
import net.snowflake.ingest.utils.SFException;
import org.apache.iceberg.parquet.TypeToMessageType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand All @@ -33,9 +31,6 @@ public class ParquetTypeGenerator {
AbstractRowBuffer.ColumnPhysicalType.SB8,
AbstractRowBuffer.ColumnPhysicalType.SB16));

/** Util class that contains the mapping between Iceberg data type and Parquet data type */
private static final TypeToMessageType typeToMessageType = new TypeToMessageType();

/**
* Generate the column parquet type and metadata from the column metadata received from server
* side.
Expand All @@ -46,7 +41,6 @@ public class ParquetTypeGenerator {
*/
static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int id) {
id = column.getOrdinal() == null ? id : column.getOrdinal();
ParquetTypeInfo res = new ParquetTypeInfo();
Type parquetType;
Map<String, String> metadata = new HashMap<>();
String name = column.getInternalName();
Expand All @@ -60,10 +54,9 @@ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int
column.getNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED;

if (column.getSourceIcebergDataType() != null) {
org.apache.iceberg.types.Type icebergDataType =
deserializeIcebergType(column.getSourceIcebergDataType());
parquetType =
typeToMessageType.primitive(icebergDataType.asPrimitiveType(), repetition, id, name);
IcebergDataTypeParser.parseIcebergDataTypeStringToParquetType(
column.getSourceIcebergDataType(), repetition, id, name);
} else {
AbstractRowBuffer.ColumnPhysicalType physicalType;
AbstractRowBuffer.ColumnLogicalType logicalType;
Expand Down Expand Up @@ -147,9 +140,7 @@ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int
ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType());
}
}
res.setParquetType(parquetType);
res.setMetadata(metadata);
return res;
return new ParquetTypeInfo(parquetType, metadata);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
package net.snowflake.ingest.streaming.internal;

import java.util.Map;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
* Util class that contains Parquet type and other metadata for that type needed by the Snowflake
* server side scanner
*/
class ParquetTypeInfo {
private Type parquetType;
private Map<String, String> metadata;
private final Type parquetType;
private final Map<String, String> metadata;

ParquetTypeInfo(Type parquetType, Map<String, String> metadata) {
this.parquetType = parquetType;
this.metadata = metadata;
}

public Type getParquetType() {
return this.parquetType;
Expand All @@ -23,16 +27,4 @@ public Type getParquetType() {
public Map<String, String> getMetadata() {
return this.metadata;
}

public void setParquetType(Type parquetType) {
this.parquetType = parquetType;
}

public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}

public PrimitiveType.PrimitiveTypeName getPrimitiveTypeName() {
return parquetType.asPrimitiveType().getPrimitiveTypeName();
}
}
Loading

0 comments on commit ff8f766

Please sign in to comment.