Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1708577 Parquet V2 support for new table format #851

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ParquetProperties;

/**
* The abstract implementation of the buffer in the Streaming Ingest channel that holds the
Expand Down Expand Up @@ -668,6 +669,7 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
ChannelRuntimeState channelRuntimeState,
ClientBufferParameters clientBufferParameters,
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion,
TelemetryService telemetryService) {
switch (bdecVersion) {
case THREE:
Expand All @@ -681,6 +683,7 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
channelRuntimeState,
clientBufferParameters,
offsetTokenVerificationFunction,
parquetWriterVersion,
telemetryService);
default:
throw new SFException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.hadoop.ParquetFileWriter;

/**
* Build a single blob file that contains file header plus data. The header will be a
Expand Down Expand Up @@ -140,7 +141,7 @@ static <T> Blob constructBlobAndMetadata(

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd much rather depend on our own constant than a third-party library's constant, I thought I had left a comment on this but don't see it anywhere :(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok to take in next PR too, just remove the import whenever you revert this.

.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,8 @@ public Optional<Integer> getMaxRowGroups() {
public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}

public boolean isEnableDictionaryEncoding() {
return isIcebergMode;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be dependent on storage serialization policy too, lets verify. no need to hold up PR.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class OpenChannelResponse extends StreamingIngestResponse {
private String encryptionKey;
private Long encryptionKeyId;
private FileLocationInfo icebergLocationInfo;
private String icebergSerializationPolicy;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
Expand Down Expand Up @@ -140,4 +141,13 @@ void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) {
FileLocationInfo getIcebergLocationInfo() {
return this.icebergLocationInfo;
}

@JsonProperty("iceberg_serialization_policy")
void setIcebergSerializationPolicy(String icebergSerializationPolicy) {
this.icebergSerializationPolicy = icebergSerializationPolicy;
}

String getIcebergSerializationPolicy() {
return this.icebergSerializationPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;

Expand All @@ -30,17 +31,23 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final Optional<Integer> maxRowGroups;

private final Constants.BdecParquetCompression bdecParquetCompression;
private final ParquetProperties.WriterVersion parquetWriterVersion;
private final boolean enableDictionaryEncoding;

/** Construct parquet flusher from its schema. */
public ParquetFlusher(
MessageType schema,
long maxChunkSizeInBytes,
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
}

@Override
Expand Down Expand Up @@ -129,7 +136,9 @@ private SerializationResult serializeFromJavaObjects(
firstChannelFullyQualifiedTableName,
maxChunkSizeInBytes,
maxRowGroups,
bdecParquetCompression);
bdecParquetCompression,
parquetWriterVersion,
enableDictionaryEncoding);
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand All @@ -45,6 +46,8 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
private final List<List<Object>> data;
private final List<List<Object>> tempData;

private final ParquetProperties.WriterVersion parquetWriterVersion;

private MessageType schema;

/** Construct a ParquetRowBuffer object. */
Expand All @@ -56,6 +59,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
ChannelRuntimeState channelRuntimeState,
ClientBufferParameters clientBufferParameters,
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion,
TelemetryService telemetryService) {
super(
onErrorOption,
Expand All @@ -70,6 +74,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
this.metadata = new HashMap<>();
this.data = new ArrayList<>();
this.tempData = new ArrayList<>();
this.parquetWriterVersion = parquetWriterVersion;
}

/**
Expand Down Expand Up @@ -394,6 +399,8 @@ public Flusher<ParquetChunkData> createFlusher() {
schema,
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getMaxRowGroups(),
clientBufferParameters.getBdecParquetCompression());
clientBufferParameters.getBdecParquetCompression(),
parquetWriterVersion,
clientBufferParameters.isEnableDictionaryEncoding());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/** Builds a Streaming Ingest channel for a specific Streaming Ingest client */
class SnowflakeStreamingIngestChannelFactory {
Expand All @@ -30,6 +31,7 @@ static class SnowflakeStreamingIngestChannelBuilder<T> {
private OpenChannelRequest.OnErrorOption onErrorOption;
private ZoneId defaultTimezone;
private OffsetTokenVerificationFunction offsetTokenVerificationFunction;
private ParquetProperties.WriterVersion parquetWriterVersion;

private SnowflakeStreamingIngestChannelBuilder(String name) {
this.name = name;
Expand Down Expand Up @@ -98,6 +100,12 @@ SnowflakeStreamingIngestChannelBuilder<T> setOffsetTokenVerificationFunction(
return this;
}

SnowflakeStreamingIngestChannelBuilder<T> setParquetWriterVersion(
ParquetProperties.WriterVersion parquetWriterVersion) {
this.parquetWriterVersion = parquetWriterVersion;
return this;
}

SnowflakeStreamingIngestChannelInternal<T> build() {
Utils.assertStringNotNullOrEmpty("channel name", this.name);
Utils.assertStringNotNullOrEmpty("table name", this.tableName);
Expand All @@ -123,8 +131,8 @@ SnowflakeStreamingIngestChannelInternal<T> build() {
this.encryptionKeyId,
this.onErrorOption,
this.defaultTimezone,
this.owningClient.getParameterProvider().getBlobFormatVersion(),
this.offsetTokenVerificationFunction);
this.offsetTokenVerificationFunction,
this.parquetWriterVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import com.google.common.annotations.VisibleForTesting;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -19,17 +18,18 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/**
* The first version of implementation for SnowflakeStreamingIngestChannel
Expand Down Expand Up @@ -68,48 +68,6 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
private final MemoryInfoProvider memoryInfoProvider;
private volatile long freeMemoryInBytes = 0;

/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
* @param name
* @param dbName
* @param schemaName
* @param tableName
* @param offsetToken
* @param channelSequencer
* @param rowSequencer
* @param client
*/
SnowflakeStreamingIngestChannelInternal(
String name,
String dbName,
String schemaName,
String tableName,
String offsetToken,
Long channelSequencer,
Long rowSequencer,
SnowflakeStreamingIngestClientInternal<T> client,
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneOffset defaultTimezone) {
this(
name,
dbName,
schemaName,
tableName,
offsetToken,
channelSequencer,
rowSequencer,
client,
encryptionKey,
encryptionKeyId,
onErrorOption,
defaultTimezone,
client.getParameterProvider().getBlobFormatVersion(),
null);
}

/** Default constructor */
SnowflakeStreamingIngestChannelInternal(
String name,
Expand All @@ -119,13 +77,13 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
String endOffsetToken,
Long channelSequencer,
Long rowSequencer,
SnowflakeStreamingIngestClientInternal<T> client,
@Nonnull SnowflakeStreamingIngestClientInternal<T> client,
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
Constants.BdecVersion bdecVersion,
OffsetTokenVerificationFunction offsetTokenVerificationFunction) {
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion) {
this.isClosed = false;
this.owningClient = client;

Expand All @@ -147,13 +105,14 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
AbstractRowBuffer.createRowBuffer(
onErrorOption,
defaultTimezone,
bdecVersion,
client.getParameterProvider().getBlobFormatVersion(),
getFullyQualifiedName(),
this::collectRowSize,
channelState,
new ClientBufferParameters(owningClient),
offsetTokenVerificationFunction,
owningClient == null ? null : owningClient.getTelemetryService());
parquetWriterVersion,
owningClient.getTelemetryService());
this.tableColumns = new HashMap<>();
logger.logInfo(
"Channel={} created for table={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.SnowflakeURL;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/**
* The first version of implementation for SnowflakeStreamingIngestClient. The client internally
Expand Down Expand Up @@ -355,11 +356,17 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
}

if (isIcebergMode
&& response.getTableColumns().stream()
.anyMatch(c -> c.getSourceIcebergDataType() == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set");
if (isIcebergMode) {
if (response.getTableColumns().stream().anyMatch(c -> c.getSourceIcebergDataType() == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set");
}

if (response.getIcebergSerializationPolicy() == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Iceberg Table's open channel response does not have serialization policy set.");
}
}

logger.logInfo(
Expand All @@ -386,6 +393,12 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
.setOnErrorOption(request.getOnErrorOption())
.setDefaultTimezone(request.getDefaultTimezone())
.setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction())
.setParquetWriterVersion(
isIcebergMode
? Constants.IcebergSerializationPolicy.valueOf(
response.getIcebergSerializationPolicy())
.toParquetWriterVersion()
: ParquetProperties.WriterVersion.PARQUET_1_0)
.build();

// Setup the row buffer schema
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.utils;

import java.util.Arrays;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/** Contains all the constants needed for Streaming Ingest */
Expand Down Expand Up @@ -71,9 +72,31 @@ public class Constants {
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public static final int PARQUET_MAJOR_VERSION = 1;
public static final int PARQUET_MINOR_VERSION = 0;

/**
* Iceberg table serialization policy. Use v2 parquet writer for optimized serialization,
* otherwise v1.
*/
public enum IcebergSerializationPolicy {
COMPATIBLE,
OPTIMIZED;

public ParquetProperties.WriterVersion toParquetWriterVersion() {
switch (this) {
case COMPATIBLE:
return ParquetProperties.WriterVersion.PARQUET_1_0;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that non-iceberg tables (which are snowflake managed tables AFAIK) only support parquet v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server-side scanner for FDN tables supports Parquet V2. This PR is specific to the Iceberg table feature and does not alter the default behavior for streaming to FDN tables.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

case OPTIMIZED:
return ParquetProperties.WriterVersion.PARQUET_2_0;
default:
throw new IllegalArgumentException(
String.format(
"Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s",
this.name(), Arrays.asList(IcebergSerializationPolicy.values())));
}
}
}

public enum WriteMode {
CLOUD_STORAGE,
REST_API,
Expand Down
Loading
Loading