Skip to content

Commit

Permalink
(Stacked PR) Code review changes for Parquet V2 (#862)
Browse files Browse the repository at this point in the history
1. Remove logic from OpenChannelResponse contract class
2. Move writerVersion defaulting to channel construction callsite in clientInternal (from channel ctor), instead of passing writerVersion=null into the channel.
3. Pass around writerVersion via RowBuffer into Flusher, instead of via the per-chunk flushContext.
4. Remove a test-only overload of ChannelInternal
5. Remove an unnecessary parameter on ChannelInternal ctor (bdecVersion)
6. Remove SerializationPolicy.NON_ICEBERG, remove the custom SerPolicy.fromName method and use Enum.valueOf that java already has
  • Loading branch information
sfc-gh-hmadan authored Oct 12, 2024
1 parent 1784af8 commit 7356bee
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ParquetProperties;

/**
* The abstract implementation of the buffer in the Streaming Ingest channel that holds the
Expand Down Expand Up @@ -668,6 +669,7 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
ChannelRuntimeState channelRuntimeState,
ClientBufferParameters clientBufferParameters,
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion,
TelemetryService telemetryService) {
switch (bdecVersion) {
case THREE:
Expand All @@ -681,6 +683,7 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
channelRuntimeState,
clientBufferParameters,
offsetTokenVerificationFunction,
parquetWriterVersion,
telemetryService);
default:
throw new SFException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/**
* Channel immutable identification and encryption attributes.
Expand All @@ -30,17 +29,14 @@ class ChannelFlushContext {
// Data encryption key id
private final Long encryptionKeyId;

private final ParquetProperties.WriterVersion parquetWriterVersion;

ChannelFlushContext(
String name,
String dbName,
String schemaName,
String tableName,
Long channelSequencer,
String encryptionKey,
Long encryptionKeyId,
ParquetProperties.WriterVersion parquetWriterVersion) {
Long encryptionKeyId) {
this.name = name;
this.fullyQualifiedName =
Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
Expand All @@ -51,7 +47,6 @@ class ChannelFlushContext {
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
this.parquetWriterVersion = parquetWriterVersion;
}

@Override
Expand Down Expand Up @@ -120,8 +115,4 @@ String getEncryptionKey() {
Long getEncryptionKeyId() {
return encryptionKeyId;
}

ParquetProperties.WriterVersion getParquetWriterVersion() {
return parquetWriterVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import net.snowflake.ingest.utils.Constants;

/** Response to the OpenChannelRequest */
class OpenChannelResponse extends StreamingIngestResponse {
Expand Down Expand Up @@ -148,7 +147,7 @@ void setIcebergSerializationPolicy(String icebergSerializationPolicy) {
this.icebergSerializationPolicy = icebergSerializationPolicy;
}

Constants.IcebergSerializationPolicy getIcebergSerializationPolicy() {
return Constants.IcebergSerializationPolicy.fromName(this.icebergSerializationPolicy);
String getIcebergSerializationPolicy() {
return this.icebergSerializationPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final Optional<Integer> maxRowGroups;

private final Constants.BdecParquetCompression bdecParquetCompression;
private final ParquetProperties.WriterVersion parquetWriterVersion;
private final boolean enableDictionaryEncoding;

/** Construct parquet flusher from its schema. */
Expand All @@ -39,11 +40,13 @@ public ParquetFlusher(
long maxChunkSizeInBytes,
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
}

Expand All @@ -66,7 +69,6 @@ private SerializationResult serializeFromJavaObjects(
BdecParquetWriter parquetWriter;
ByteArrayOutputStream mergedData = new ByteArrayOutputStream();
Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;
ParquetProperties.WriterVersion parquetWriterVersion = null;

for (ChannelData<ParquetChunkData> data : channelsDataPerTable) {
// Create channel metadata
Expand Down Expand Up @@ -108,15 +110,6 @@ private SerializationResult serializeFromJavaObjects(
chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
}

// Check if all the channels have the same parquet writer version
if (parquetWriterVersion == null) {
parquetWriterVersion = data.getChannelContext().getParquetWriterVersion();
} else if (!parquetWriterVersion.equals(data.getChannelContext().getParquetWriterVersion())) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Parquet writer version and storage serialization policy mismatch within a chunk");
}

rows.addAll(data.getVectors().rows);

rowCount += data.getRowCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
Expand All @@ -45,6 +46,8 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
private final List<List<Object>> data;
private final List<List<Object>> tempData;

private final ParquetProperties.WriterVersion parquetWriterVersion;

private MessageType schema;

/** Construct a ParquetRowBuffer object. */
Expand All @@ -56,6 +59,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
ChannelRuntimeState channelRuntimeState,
ClientBufferParameters clientBufferParameters,
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion,
TelemetryService telemetryService) {
super(
onErrorOption,
Expand All @@ -70,6 +74,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
this.metadata = new HashMap<>();
this.data = new ArrayList<>();
this.tempData = new ArrayList<>();
this.parquetWriterVersion = parquetWriterVersion;
}

/**
Expand Down Expand Up @@ -395,6 +400,7 @@ public Flusher<ParquetChunkData> createFlusher() {
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getMaxRowGroups(),
clientBufferParameters.getBdecParquetCompression(),
parquetWriterVersion,
clientBufferParameters.isEnableDictionaryEncoding());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ SnowflakeStreamingIngestChannelInternal<T> build() {
this.encryptionKeyId,
this.onErrorOption,
this.defaultTimezone,
this.owningClient.getParameterProvider().getBlobFormatVersion(),
this.offsetTokenVerificationFunction,
this.parquetWriterVersion);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import com.google.common.annotations.VisibleForTesting;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -19,13 +18,13 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
Expand Down Expand Up @@ -69,49 +68,6 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
private final MemoryInfoProvider memoryInfoProvider;
private volatile long freeMemoryInBytes = 0;

/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
* @param name
* @param dbName
* @param schemaName
* @param tableName
* @param offsetToken
* @param channelSequencer
* @param rowSequencer
* @param client
*/
SnowflakeStreamingIngestChannelInternal(
String name,
String dbName,
String schemaName,
String tableName,
String offsetToken,
Long channelSequencer,
Long rowSequencer,
SnowflakeStreamingIngestClientInternal<T> client,
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneOffset defaultTimezone) {
this(
name,
dbName,
schemaName,
tableName,
offsetToken,
channelSequencer,
rowSequencer,
client,
encryptionKey,
encryptionKeyId,
onErrorOption,
defaultTimezone,
client.getParameterProvider().getBlobFormatVersion(),
null /* offsetTokenVerificationFunction */,
null /* parquetWriterVersion */);
}

/** Default constructor */
SnowflakeStreamingIngestChannelInternal(
String name,
Expand All @@ -121,12 +77,11 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
String endOffsetToken,
Long channelSequencer,
Long rowSequencer,
SnowflakeStreamingIngestClientInternal<T> client,
@Nonnull SnowflakeStreamingIngestClientInternal<T> client,
String encryptionKey,
Long encryptionKeyId,
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
Constants.BdecVersion bdecVersion,
OffsetTokenVerificationFunction offsetTokenVerificationFunction,
ParquetProperties.WriterVersion parquetWriterVersion) {
this.isClosed = false;
Expand All @@ -144,28 +99,20 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
this.memoryInfoProvider = MemoryInfoProviderFromRuntime.getInstance();
this.channelFlushContext =
new ChannelFlushContext(
name,
dbName,
schemaName,
tableName,
channelSequencer,
encryptionKey,
encryptionKeyId,
parquetWriterVersion == null
? ParquetProperties.DEFAULT_WRITER_VERSION
: parquetWriterVersion);
name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId);
this.channelState = new ChannelRuntimeState(endOffsetToken, rowSequencer, true);
this.rowBuffer =
AbstractRowBuffer.createRowBuffer(
onErrorOption,
defaultTimezone,
bdecVersion,
client.getParameterProvider().getBlobFormatVersion(),
getFullyQualifiedName(),
this::collectRowSize,
channelState,
new ClientBufferParameters(owningClient),
offsetTokenVerificationFunction,
owningClient == null ? null : owningClient.getTelemetryService());
parquetWriterVersion,
owningClient.getTelemetryService());
this.tableColumns = new HashMap<>();
logger.logInfo(
"Channel={} created for table={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.SnowflakeURL;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/**
* The first version of implementation for SnowflakeStreamingIngestClient. The client internally
Expand Down Expand Up @@ -355,11 +356,17 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
}

if (isIcebergMode
&& response.getTableColumns().stream()
.anyMatch(c -> c.getSourceIcebergDataType() == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set");
if (isIcebergMode) {
if (response.getTableColumns().stream().anyMatch(c -> c.getSourceIcebergDataType() == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set");
}

if (response.getIcebergSerializationPolicy() == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Iceberg Table's open channel response does not have serialization policy set.");
}
}

logger.logInfo(
Expand Down Expand Up @@ -387,7 +394,11 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
.setDefaultTimezone(request.getDefaultTimezone())
.setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction())
.setParquetWriterVersion(
response.getIcebergSerializationPolicy().getParquetWriterVersion())
isIcebergMode
? Constants.IcebergSerializationPolicy.valueOf(
response.getIcebergSerializationPolicy())
.toParquetWriterVersion()
: ParquetProperties.WriterVersion.PARQUET_1_0)
.build();

// Setup the row buffer schema
Expand Down
19 changes: 1 addition & 18 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,11 @@ public class Constants {
* otherwise v1.
*/
public enum IcebergSerializationPolicy {
NON_ICEBERG,
COMPATIBLE,
OPTIMIZED;

public static IcebergSerializationPolicy fromName(String name) {
if (name == null) {
return NON_ICEBERG;
}
for (IcebergSerializationPolicy e : IcebergSerializationPolicy.values()) {
if (e.name().equalsIgnoreCase(name)) {
return e;
}
}
throw new IllegalArgumentException(
String.format(
"Unsupported ICEBERG_SERIALIZATION_POLICY = '%s', allowed values are %s",
name, Arrays.asList(IcebergSerializationPolicy.values())));
}

public ParquetProperties.WriterVersion getParquetWriterVersion() {
public ParquetProperties.WriterVersion toParquetWriterVersion() {
switch (this) {
case NON_ICEBERG:
case COMPATIBLE:
return ParquetProperties.WriterVersion.PARQUET_1_0;
case OPTIMIZED:
Expand Down
Loading

0 comments on commit 7356bee

Please sign in to comment.