Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1760732 Enable Iceberg mode from properties #871

Merged
merged 7 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ public static class Builder {
// Indicates whether it's under test mode
private boolean isTestMode;

// Whether we are going to ingest into iceberg tables
private boolean isIceberg;

private Builder(String name) {
this.name = name;
}
Expand All @@ -53,12 +50,6 @@ public Builder setIsTestMode(boolean isTestMode) {
return this;
}

// do not make public until the feature is ready
Builder setIsIceberg(boolean isIceberg) {
this.isIceberg = isIceberg;
return this;
}

public SnowflakeStreamingIngestClient build() {
Utils.assertStringNotNullOrEmpty("client name", this.name);
Utils.assertNotNull("connection properties", this.prop);
Expand All @@ -67,7 +58,7 @@ public SnowflakeStreamingIngestClient build() {
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));

return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, this.isIceberg, this.isTestMode);
this.name, accountURL, prop, this.parameterOverrides, this.isTestMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class ClientBufferParameters {

private final Optional<Integer> maxRowGroups;

private boolean isIcebergMode;
private boolean enableIcebergStreaming;

private boolean enableDistinctValuesCount;

Expand All @@ -34,23 +34,23 @@ public class ClientBufferParameters {
*
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param isIcebergMode
* @param enableIcebergStreaming
*/
private ClientBufferParameters(
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode,
boolean enableIcebergStreaming,
boolean enableDistinctValuesCount,
boolean enableValuesCount) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.maxRowGroups = maxRowGroups;
this.isIcebergMode = isIcebergMode;
this.enableIcebergStreaming = enableIcebergStreaming;
this.enableDistinctValuesCount = enableDistinctValuesCount;
this.enableValuesCount = enableValuesCount;
}
Expand All @@ -73,12 +73,12 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
this.isIcebergMode =
this.enableIcebergStreaming =
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
? clientInternal.getParameterProvider().isEnableIcebergStreaming()
: ParameterProvider.ENABLE_ICEBERG_STREAMING_DEFAULT;
this.maxRowGroups =
isIcebergMode
enableIcebergStreaming
? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT)
: Optional.empty();
this.enableDistinctValuesCount =
Expand All @@ -94,7 +94,7 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
/**
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param isIcebergMode
* @param enableIcebergStreaming
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
Expand All @@ -103,7 +103,7 @@ public static ClientBufferParameters test_createClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode,
boolean enableIcebergStreaming,
boolean enableDistinctValuesCount,
boolean enableValuesCount) {
return new ClientBufferParameters(
Expand All @@ -112,7 +112,7 @@ public static ClientBufferParameters test_createClientBufferParameters(
bdecParquetCompression,
enableNewJsonParsingLogic,
maxRowGroups,
isIcebergMode,
enableIcebergStreaming,
enableDistinctValuesCount,
enableValuesCount);
}
Expand All @@ -133,16 +133,16 @@ public boolean isEnableNewJsonParsingLogic() {
return enableNewJsonParsingLogic;
}

public boolean getIsIcebergMode() {
return isIcebergMode;
public boolean getEnableIcebergStreaming() {
return enableIcebergStreaming;
}

public Optional<Integer> getMaxRowGroups() {
return maxRowGroups;
}

public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
return enableIcebergStreaming ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}

public boolean isEnableDistinctValuesCount() {
Expand All @@ -154,6 +154,6 @@ public boolean isEnableValuesCount() {
}

public boolean isEnableDictionaryEncoding() {
return isIcebergMode;
return enableIcebergStreaming;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

/**
Expand Down Expand Up @@ -67,7 +71,7 @@ public boolean isNullable() {
* here is meant to conform to the json schema specified here:
* https://iceberg.apache.org/spec/#appendix-c-json-serialization
*
* <p>Make this a public API when the Builder.setIsIceberg API is made public.
* <p>Make this a public API when the Builder.setEnableIcebergStreaming API is made public.
*/
String getIcebergSchema() {
return icebergColumnSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DropChannelRequestInternal implements IStreamingIngestRequest {
Long clientSequencer;

@JsonProperty("is_iceberg")
boolean isIceberg;
boolean enableIcebergStreaming;

DropChannelRequestInternal(
String requestId,
Expand All @@ -43,15 +43,15 @@ class DropChannelRequestInternal implements IStreamingIngestRequest {
String schema,
String table,
String channel,
boolean isIceberg,
boolean enableIcebergStreaming,
Long clientSequencer) {
this.requestId = requestId;
this.role = role;
this.database = database;
this.schema = schema;
this.table = table;
this.channel = channel;
this.isIceberg = isIceberg;
this.enableIcebergStreaming = enableIcebergStreaming;
this.clientSequencer = clientSequencer;
}

Expand Down Expand Up @@ -79,8 +79,8 @@ String getSchema() {
return schema;
}

boolean isIceberg() {
return isIceberg;
boolean enableIcebergStreaming() {
return enableIcebergStreaming;
}

Long getClientSequencer() {
Expand All @@ -95,7 +95,7 @@ String getFullyQualifiedTableName() {
public String getStringForLogging() {
return String.format(
"DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ " isIceberg=%s, clientSequencer=%s)",
requestId, role, database, schema, table, channel, isIceberg, clientSequencer);
+ " enableIcebergStreaming=%s, clientSequencer=%s)",
requestId, role, database, schema, table, channel, enableIcebergStreaming, clientSequencer);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -25,23 +29,23 @@ class GeneratePresignedUrlsRequest implements IStreamingIngestRequest {
private Long deploymentGlobalId;

@JsonProperty("is_iceberg")
private boolean isIceberg;
private boolean enableIcebergStreaming;

public GeneratePresignedUrlsRequest(
TableRef tableRef,
String role,
int count,
int timeoutInSeconds,
Long deploymentGlobalId,
boolean isIceberg) {
boolean enableIcebergStreaming) {
this.dbName = tableRef.dbName;
this.schemaName = tableRef.schemaName;
this.tableName = tableRef.tableName;
this.count = count;
this.role = role;
this.timeoutInSeconds = timeoutInSeconds;
this.deploymentGlobalId = deploymentGlobalId;
this.isIceberg = isIceberg;
this.enableIcebergStreaming = enableIcebergStreaming;
}

String getDBName() {
Expand Down Expand Up @@ -72,22 +76,22 @@ Integer getTimeoutInSeconds() {
return this.timeoutInSeconds;
}

boolean getIsIceberg() {
return this.isIceberg;
boolean getEnableIcebergStreaming() {
return this.enableIcebergStreaming;
}

@Override
public String getStringForLogging() {
return String.format(
"GetPresignedUrlsRequest(db=%s, schema=%s, table=%s, count=%s, timeoutInSeconds=%s"
+ " deploymentGlobalId=%s role=%s, isIceberg=%s)",
+ " deploymentGlobalId=%s role=%s, enableIcebergStreaming=%s)",
dbName,
schemaName,
tableName,
count,
timeoutInSeconds,
deploymentGlobalId,
role,
isIceberg);
enableIcebergStreaming);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,40 @@ class InternalParameterProvider {
public static final boolean ENABLE_DISTINCT_VALUES_COUNT_DEFAULT = false;
public static final boolean ENABLE_VALUES_COUNT_DEFAULT = false;

private final boolean isIcebergMode;
private final boolean enableIcebergStreaming;

InternalParameterProvider(boolean isIcebergMode) {
this.isIcebergMode = isIcebergMode;
InternalParameterProvider(boolean enableIcebergStreaming) {
this.enableIcebergStreaming = enableIcebergStreaming;
}

boolean getEnableChunkEncryption() {
// When in Iceberg mode, chunk encryption is disabled. Otherwise, it is enabled. Since Iceberg
// mode does not need client-side encryption.
return !isIcebergMode;
return !enableIcebergStreaming;
}

boolean setAllDefaultValuesInEp() {
// When in non-iceberg mode, we want to default the stats for all data types (int/real/string)
// to 0 / to "".
// However when in iceberg mode, we want to default only those stats that are
// relevant to the current datatype.
return !isIcebergMode;
return !enableIcebergStreaming;
}

boolean setIcebergSpecificFieldsInEp() {
// When in Iceberg mode, we need to explicitly populate the major and minor version of parquet
// in the EP metadata, createdOn, and extendedMetadataSize.
return isIcebergMode;
return enableIcebergStreaming;
}

boolean isEnableDistinctValuesCount() {
// When in Iceberg mode, we enabled distinct values count in EP metadata.
return isIcebergMode;
return enableIcebergStreaming;
}

boolean isEnableValuesCount() {
// When in Iceberg mode, we enabled values count in EP metadata for repeated group (e.g. map,
// list).
return isIcebergMode;
return enableIcebergStreaming;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest {

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("is_iceberg")
private boolean isIceberg;
private boolean enableIcebergStreaming;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("offset_token")
Expand All @@ -48,7 +48,7 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest {
String table,
String channel,
Constants.WriteMode writeMode,
boolean isIceberg,
boolean enableIcebergStreaming,
String offsetToken) {
this.requestId = requestId;
this.role = role;
Expand All @@ -57,7 +57,7 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest {
this.table = table;
this.channel = channel;
this.writeMode = writeMode.name();
this.isIceberg = isIceberg;
this.enableIcebergStreaming = enableIcebergStreaming;
this.offsetToken = offsetToken;
}

Expand Down Expand Up @@ -89,8 +89,8 @@ String getWriteMode() {
return writeMode;
}

boolean getIsIceberg() {
return isIceberg;
boolean getEnableIcebergStreaming() {
return enableIcebergStreaming;
}

String getOffsetToken() {
Expand All @@ -101,7 +101,7 @@ String getOffsetToken() {
public String getStringForLogging() {
return String.format(
"OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ " writeMode=%s, isIceberg=%s)",
requestId, role, database, schema, table, channel, writeMode, isIceberg);
+ " writeMode=%s, enableIcebergStreaming=%s)",
requestId, role, database, schema, table, channel, writeMode, enableIcebergStreaming);
}
}
Loading
Loading