Skip to content

Commit

Permalink
SNOW-1708577 Parquet V2 support for new table format (#851)
Browse files Browse the repository at this point in the history
This PR aims to ensure consistency with the Iceberg table scanner and registration on the server side. It includes the following changes:

- Added support for Parquet V2 (delta encoding) when the schema's STORAGE_SERIALIZATION_POLICY is set to OPTIMIZED.
- Enabled dictionary encoding for Iceberg mode.
- Do writerVersion defaulting to channel construction callsite in clientInternal (from channel ctor), instead of passing writerVersion=null into the channel.
- Remove a test-only overload of ChannelInternal
- Remove an unnecessary parameter on ChannelInternal ctor (bdecVersion)

---------

Co-authored-by: Hitesh Madan <[email protected]>
  • Loading branch information
sfc-gh-alhuang and sfc-gh-hmadan authored Oct 14, 2024
1 parent a6339aa commit 6d6ba94
Show file tree
Hide file tree
Showing 32 changed files with 389 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ParquetProperties;

/**
* The abstract implementation of the buffer in the Streaming Ingest channel that holds the
Expand Down Expand Up @@ -668,6 +669,7 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
ChannelRuntimeState channelRuntimeState,
ClientBufferParameters clientBufferParameters,
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion,
TelemetryService telemetryService) {
switch (bdecVersion) {
case THREE:
Expand All @@ -681,6 +683,7 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
channelRuntimeState,
clientBufferParameters,
offsetTokenVerificationFunction,
parquetWriterVersion,
telemetryService);
default:
throw new SFException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.hadoop.ParquetFileWriter;

/**
* Build a single blob file that contains file header plus data. The header will be a
Expand Down Expand Up @@ -140,7 +141,7 @@ static <T> Blob constructBlobAndMetadata(

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,8 @@ public Optional<Integer> getMaxRowGroups() {
public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}

public boolean isEnableDictionaryEncoding() {
return isIcebergMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class OpenChannelResponse extends StreamingIngestResponse {
private String encryptionKey;
private Long encryptionKeyId;
private FileLocationInfo icebergLocationInfo;
private String icebergSerializationPolicy;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
Expand Down Expand Up @@ -140,4 +141,13 @@ void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) {
FileLocationInfo getIcebergLocationInfo() {
return this.icebergLocationInfo;
}

@JsonProperty("iceberg_serialization_policy")
void setIcebergSerializationPolicy(String icebergSerializationPolicy) {
this.icebergSerializationPolicy = icebergSerializationPolicy;
}

String getIcebergSerializationPolicy() {
return this.icebergSerializationPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;

Expand All @@ -30,17 +31,23 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final Optional<Integer> maxRowGroups;

private final Constants.BdecParquetCompression bdecParquetCompression;
private final ParquetProperties.WriterVersion parquetWriterVersion;
private final boolean enableDictionaryEncoding;

/** Construct parquet flusher from its schema. */
public ParquetFlusher(
MessageType schema,
long maxChunkSizeInBytes,
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
}

@Override
Expand Down Expand Up @@ -129,7 +136,9 @@ private SerializationResult serializeFromJavaObjects(
firstChannelFullyQualifiedTableName,
maxChunkSizeInBytes,
maxRowGroups,
bdecParquetCompression);
bdecParquetCompression,
parquetWriterVersion,
enableDictionaryEncoding);
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand All @@ -45,6 +46,8 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
private final List<List<Object>> data;
private final List<List<Object>> tempData;

private final ParquetProperties.WriterVersion parquetWriterVersion;

private MessageType schema;

/** Construct a ParquetRowBuffer object. */
Expand All @@ -56,6 +59,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
ChannelRuntimeState channelRuntimeState,
ClientBufferParameters clientBufferParameters,
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion,
TelemetryService telemetryService) {
super(
onErrorOption,
Expand All @@ -70,6 +74,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
this.metadata = new HashMap<>();
this.data = new ArrayList<>();
this.tempData = new ArrayList<>();
this.parquetWriterVersion = parquetWriterVersion;
}

/**
Expand Down Expand Up @@ -394,6 +399,8 @@ public Flusher<ParquetChunkData> createFlusher() {
schema,
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getMaxRowGroups(),
clientBufferParameters.getBdecParquetCompression());
clientBufferParameters.getBdecParquetCompression(),
parquetWriterVersion,
clientBufferParameters.isEnableDictionaryEncoding());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/** Builds a Streaming Ingest channel for a specific Streaming Ingest client */
class SnowflakeStreamingIngestChannelFactory {
Expand All @@ -30,6 +31,7 @@ static class SnowflakeStreamingIngestChannelBuilder<T> {
private OpenChannelRequest.OnErrorOption onErrorOption;
private ZoneId defaultTimezone;
private OffsetTokenVerificationFunction offsetTokenVerificationFunction;
private ParquetProperties.WriterVersion parquetWriterVersion;

private SnowflakeStreamingIngestChannelBuilder(String name) {
this.name = name;
Expand Down Expand Up @@ -98,6 +100,12 @@ SnowflakeStreamingIngestChannelBuilder<T> setOffsetTokenVerificationFunction(
return this;
}

SnowflakeStreamingIngestChannelBuilder<T> setParquetWriterVersion(
ParquetProperties.WriterVersion parquetWriterVersion) {
this.parquetWriterVersion = parquetWriterVersion;
return this;
}

SnowflakeStreamingIngestChannelInternal<T> build() {
Utils.assertStringNotNullOrEmpty("channel name", this.name);
Utils.assertStringNotNullOrEmpty("table name", this.tableName);
Expand All @@ -123,8 +131,8 @@ SnowflakeStreamingIngestChannelInternal<T> build() {
this.encryptionKeyId,
this.onErrorOption,
this.defaultTimezone,
this.owningClient.getParameterProvider().getBlobFormatVersion(),
this.offsetTokenVerificationFunction);
this.offsetTokenVerificationFunction,
this.parquetWriterVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import com.google.common.annotations.VisibleForTesting;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -19,17 +18,18 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/**
* The first version of implementation for SnowflakeStreamingIngestChannel
Expand Down Expand Up @@ -68,48 +68,6 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
private final MemoryInfoProvider memoryInfoProvider;
private volatile long freeMemoryInBytes = 0;

/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
* @param name
* @param dbName
* @param schemaName
* @param tableName
* @param offsetToken
* @param channelSequencer
* @param rowSequencer
* @param client
*/
SnowflakeStreamingIngestChannelInternal(
String name,
String dbName,
String schemaName,
String tableName,
String offsetToken,
Long channelSequencer,
Long rowSequencer,
SnowflakeStreamingIngestClientInternal<T> client,
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneOffset defaultTimezone) {
this(
name,
dbName,
schemaName,
tableName,
offsetToken,
channelSequencer,
rowSequencer,
client,
encryptionKey,
encryptionKeyId,
onErrorOption,
defaultTimezone,
client.getParameterProvider().getBlobFormatVersion(),
null);
}

/** Default constructor */
SnowflakeStreamingIngestChannelInternal(
String name,
Expand All @@ -119,13 +77,13 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
String endOffsetToken,
Long channelSequencer,
Long rowSequencer,
SnowflakeStreamingIngestClientInternal<T> client,
@Nonnull SnowflakeStreamingIngestClientInternal<T> client,
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
Constants.BdecVersion bdecVersion,
OffsetTokenVerificationFunction offsetTokenVerificationFunction) {
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion) {
this.isClosed = false;
this.owningClient = client;

Expand All @@ -147,13 +105,14 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
AbstractRowBuffer.createRowBuffer(
onErrorOption,
defaultTimezone,
bdecVersion,
client.getParameterProvider().getBlobFormatVersion(),
getFullyQualifiedName(),
this::collectRowSize,
channelState,
new ClientBufferParameters(owningClient),
offsetTokenVerificationFunction,
owningClient == null ? null : owningClient.getTelemetryService());
parquetWriterVersion,
owningClient.getTelemetryService());
this.tableColumns = new HashMap<>();
logger.logInfo(
"Channel={} created for table={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.SnowflakeURL;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/**
* The first version of implementation for SnowflakeStreamingIngestClient. The client internally
Expand Down Expand Up @@ -355,11 +356,17 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
}

if (isIcebergMode
&& response.getTableColumns().stream()
.anyMatch(c -> c.getSourceIcebergDataType() == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set");
if (isIcebergMode) {
if (response.getTableColumns().stream().anyMatch(c -> c.getSourceIcebergDataType() == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set");
}

if (response.getIcebergSerializationPolicy() == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Iceberg Table's open channel response does not have serialization policy set.");
}
}

logger.logInfo(
Expand All @@ -386,6 +393,12 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
.setOnErrorOption(request.getOnErrorOption())
.setDefaultTimezone(request.getDefaultTimezone())
.setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction())
.setParquetWriterVersion(
isIcebergMode
? Constants.IcebergSerializationPolicy.valueOf(
response.getIcebergSerializationPolicy())
.toParquetWriterVersion()
: ParquetProperties.WriterVersion.PARQUET_1_0)
.build();

// Setup the row buffer schema
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.utils;

import java.util.Arrays;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/** Contains all the constants needed for Streaming Ingest */
Expand Down Expand Up @@ -71,9 +72,31 @@ public class Constants {
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public static final int PARQUET_MAJOR_VERSION = 1;
public static final int PARQUET_MINOR_VERSION = 0;

/**
* Iceberg table serialization policy. Use v2 parquet writer for optimized serialization,
* otherwise v1.
*/
public enum IcebergSerializationPolicy {
COMPATIBLE,
OPTIMIZED;

public ParquetProperties.WriterVersion toParquetWriterVersion() {
switch (this) {
case COMPATIBLE:
return ParquetProperties.WriterVersion.PARQUET_1_0;
case OPTIMIZED:
return ParquetProperties.WriterVersion.PARQUET_2_0;
default:
throw new IllegalArgumentException(
String.format(
"Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s",
this.name(), Arrays.asList(IcebergSerializationPolicy.values())));
}
}
}

public enum WriteMode {
CLOUD_STORAGE,
REST_API,
Expand Down
Loading

0 comments on commit 6d6ba94

Please sign in to comment.