Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 25, 2024
1 parent 2ecd4c9 commit b12ec41
Show file tree
Hide file tree
Showing 16 changed files with 61 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
this.isIcebergMode =
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
? clientInternal.getParameterProvider().isIcebergMode()
: ParameterProvider.STREAMING_ICEBERG_DEFAULT;
this.maxRowGroups =
isIcebergMode
? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final Constants.BdecParquetCompression bdecParquetCompression;
private final ParquetProperties.WriterVersion parquetWriterVersion;
private final boolean enableDictionaryEncoding;
private final boolean isIcebergMode;

/** Construct parquet flusher from its schema. */
public ParquetFlusher(
Expand All @@ -42,15 +41,13 @@ public ParquetFlusher(
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding,
boolean isIcebergMode) {
boolean enableDictionaryEncoding) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
this.isIcebergMode = isIcebergMode;
}

@Override
Expand Down Expand Up @@ -130,9 +127,7 @@ private SerializationResult serializeFromJavaObjects(
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(
isIcebergMode ? Constants.FULL_FILL_NAME_KEY : Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
parquetWriter =
new SnowflakeParquetWriter(
mergedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,6 @@ public Flusher<ParquetChunkData> createFlusher() {
clientBufferParameters.getMaxRowGroups(),
clientBufferParameters.getBdecParquetCompression(),
parquetWriterVersion,
parquetWriterVersion == ParquetProperties.WriterVersion.PARQUET_2_0
&& clientBufferParameters
.isEnableDictionaryEncoding() /* writer 1.0 does not support dictionary encoding*/,
clientBufferParameters.getIsIcebergMode());
clientBufferParameters.isEnableDictionaryEncoding());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
// Indicates whether the client has closed
private volatile boolean isClosed;

// Indicates wheter the client is streaming to Iceberg tables
private final boolean isIcebergMode;

// Indicates whether the client is under test mode
private final boolean isTestMode;

Expand Down Expand Up @@ -164,10 +161,9 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides) {
this.isIcebergMode =
prop != null && Boolean.parseBoolean(prop.getProperty(Constants.STREAMING_ICEBERG));
this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode);
this.internalParameterProvider = new InternalParameterProvider(isIcebergMode);
this.parameterProvider = new ParameterProvider(parameterOverrides, prop);
this.internalParameterProvider =
new InternalParameterProvider(parameterProvider.isIcebergMode());

this.name = name;
String accountName = accountURL == null ? null : accountURL.getAccount();
Expand Down Expand Up @@ -236,7 +232,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder);

this.storageManager =
isIcebergMode
parameterProvider.isIcebergMode()
? new PresignedUrlExternalVolumeManager(
isTestMode, this.role, this.name, this.snowflakeServiceClient)
: new InternalStageManager<T>(
Expand Down Expand Up @@ -346,14 +342,14 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
request.getTableName(),
request.getChannelName(),
Constants.WriteMode.CLOUD_STORAGE,
this.isIcebergMode,
this.parameterProvider.isIcebergMode(),
request.getOffsetToken());
response = snowflakeServiceClient.openChannel(openChannelRequest);
} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
}

if (isIcebergMode) {
if (parameterProvider.isIcebergMode()) {
if (response.getTableColumns().stream().anyMatch(c -> c.getSourceIcebergDataType() == null)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set");
Expand Down Expand Up @@ -391,7 +387,7 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
.setDefaultTimezone(request.getDefaultTimezone())
.setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction())
.setParquetWriterVersion(
isIcebergMode
parameterProvider.isIcebergMode()
? Constants.IcebergSerializationPolicy.valueOf(
response.getIcebergSerializationPolicy())
.toParquetWriterVersion()
Expand Down Expand Up @@ -432,7 +428,7 @@ public void dropChannel(DropChannelRequest request) {
request.getSchemaName(),
request.getTableName(),
request.getChannelName(),
this.isIcebergMode,
this.parameterProvider.isIcebergMode(),
request instanceof DropChannelVersionRequest
? ((DropChannelVersionRequest) request).getClientSequencer()
: null);
Expand Down Expand Up @@ -592,7 +588,7 @@ void registerBlobs(List<BlobMetadata> blobs, final int executionCount) {
this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(),
this.role,
blobs,
this.isIcebergMode);
this.parameterProvider.isIcebergMode());
response = snowflakeServiceClient.registerBlob(request, executionCount);
} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage());
Expand Down Expand Up @@ -939,11 +935,6 @@ public void setRefreshToken(String refreshToken) {
}
}

/** Return whether the client is streaming to Iceberg tables */
boolean isIcebergMode() {
return isIcebergMode;
}

/**
* Registers the performance metrics along with JVM memory and Threads.
*
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ public class Constants {
public static final String OAUTH_CLIENT_SECRET = "oauth_client_secret";
public static final String OAUTH_REFRESH_TOKEN = "oauth_refresh_token";
public static final String OAUTH_TOKEN_ENDPOINT = "oauth_token_endpoint";
public static final String STREAMING_ICEBERG = "streaming_iceberg";
public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request";
public static final String PRIMARY_FILE_ID_KEY =
"primaryFileId"; // Don't change, should match Parquet Scanner
public static final String FULL_FILL_NAME_KEY =
"fullFillNameKey"; // Don't change, should match Parquet Scanner
public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side
public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST =
10L; // Don't change, should match server side
Expand Down
58 changes: 35 additions & 23 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class ParameterProvider {
public static final String ENABLE_NEW_JSON_PARSING_LOGIC =
"ENABLE_NEW_JSON_PARSING_LOGIC".toLowerCase();

public static final String STREAMING_ICEBERG = "STREAMING_ICEBERG".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000;
Expand All @@ -73,22 +75,17 @@ public class ParameterProvider {

public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;
public static final Constants.BdecParquetCompression
PARQUET_COMPRESSION_ALGORITHM_ICEBERG_MODE_DEFAULT = Constants.BdecParquetCompression.ZSTD;

/* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */
public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1;

public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true;

public static final boolean IS_ICEBERG_MODE_DEFAULT = false;
public static final boolean STREAMING_ICEBERG_DEFAULT = false;

/** Map of parameter name to parameter value. This will be set by client/configure API Call. */
private final Map<String, Object> parameterMap = new HashMap<>();

/* Iceberg mode flag */
private final boolean isIcebergMode;

// Cached buffer flush interval - avoid parsing each time for quick lookup
private Long cachedBufferFlushIntervalMs = -1L;

Expand All @@ -98,18 +95,23 @@ public class ParameterProvider {
*
* @param parameterOverrides Map of parameter name to value
* @param props Properties from profile file
* @param isIcebergMode If the provided parameters need to be verified and modified to meet
* Iceberg mode
*/
public ParameterProvider(Map<String, Object> parameterOverrides, Properties props) {
this.setParameterMap(parameterOverrides, props);
}

/** Constructor for tests */
public ParameterProvider(
Map<String, Object> parameterOverrides, Properties props, boolean isIcebergMode) {
this.isIcebergMode = isIcebergMode;
this.setParameterMap(parameterOverrides, props, isIcebergMode);
if (parameterOverrides != null) {
parameterOverrides.put(STREAMING_ICEBERG, isIcebergMode);
}
this.setParameterMap(parameterOverrides, props);
}

/** Empty constructor for tests */
public ParameterProvider(boolean isIcebergMode) {
this(null, null, isIcebergMode);
this(new HashMap<>(), null, isIcebergMode);
}

private void checkAndUpdate(
Expand Down Expand Up @@ -139,11 +141,8 @@ private void checkAndUpdate(
*
* @param parameterOverrides Map<String, Object> of parameter name -> value
* @param props Properties file provided to client constructor
* @param isIcebergMode If the provided parameters need to be verified and modified to meet
* Iceberg mode
*/
private void setParameterMap(
Map<String, Object> parameterOverrides, Properties props, boolean isIcebergMode) {
private void setParameterMap(Map<String, Object> parameterOverrides, Properties props) {
// BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed
if ((parameterOverrides != null
&& parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS))
Expand All @@ -154,6 +153,13 @@ private void setParameterMap(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG));
}

this.checkAndUpdate(
STREAMING_ICEBERG,
STREAMING_ICEBERG_DEFAULT,
parameterOverrides,
props,
false /* enforceDefault */);

this.checkAndUpdate(
BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS,
BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT,
Expand Down Expand Up @@ -234,17 +240,17 @@ private void setParameterMap(

this.checkAndUpdate(
MAX_CLIENT_LAG,
isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT,
isIcebergMode() ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT,
parameterOverrides,
props,
false /* enforceDefault */);

this.checkAndUpdate(
MAX_CHUNKS_IN_BLOB,
isIcebergMode ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT,
isIcebergMode() ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT,
parameterOverrides,
props,
isIcebergMode);
isIcebergMode());

this.checkAndUpdate(
MAX_CHUNKS_IN_REGISTRATION_REQUEST,
Expand All @@ -255,9 +261,7 @@ private void setParameterMap(

this.checkAndUpdate(
BDEC_PARQUET_COMPRESSION_ALGORITHM,
isIcebergMode
? BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT
: PARQUET_COMPRESSION_ALGORITHM_ICEBERG_MODE_DEFAULT,
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props,
false /* enforceDefault */);
Expand Down Expand Up @@ -292,7 +296,7 @@ private long getMaxClientLagInMs() {
Object val =
this.parameterMap.getOrDefault(
MAX_CLIENT_LAG,
isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT);
isIcebergMode() ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT);
long computedLag;
if (val instanceof String) {
String maxLag = (String) val;
Expand Down Expand Up @@ -472,7 +476,7 @@ public int getMaxChunksInBlob() {
Object val =
this.parameterMap.getOrDefault(
MAX_CHUNKS_IN_BLOB,
isIcebergMode ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT);
isIcebergMode() ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT);
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
}

Expand Down Expand Up @@ -503,6 +507,14 @@ public boolean isEnableNewJsonParsingLogic() {
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
}

/**
* @return Whether the client is in Iceberg mode
*/
public boolean isIcebergMode() {
Object val = this.parameterMap.getOrDefault(STREAMING_ICEBERG, STREAMING_ICEBERG_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
}

@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(int metada
isIceberg
? ParquetProperties.WriterVersion.PARQUET_2_0
: ParquetProperties.WriterVersion.PARQUET_1_0,
isIceberg,
isIceberg))
.when(channelData)
.createFlusher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import org.apache.parquet.column.ParquetProperties;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -48,7 +48,7 @@ public void setup() {
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
Properties prop = new Properties();
prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client", null, prop, httpClient, true, requestBuilder, new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
}

void setParameterOverride(Map<String, Object> parameterOverride) {
this.parameterProvider = new ParameterProvider(parameterOverride, null, isIcebergMode);
this.parameterProvider = new ParameterProvider(parameterOverride, null);
}

ChannelData<T> flushChannel(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;
import org.junit.Assert;
Expand Down Expand Up @@ -58,7 +58,7 @@ public void setUpBeforeAll() {
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
Properties prop = new Properties();
prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client_PARQUET", null, prop, httpClient, true, requestBuilder, new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ public void withNullInputs() {
ParameterProvider parameterProvider = new ParameterProvider(null, null, isIcebergMode);

Assert.assertEquals(
isIcebergMode
? ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT
: ParameterProvider.MAX_CLIENT_LAG_DEFAULT,
parameterProvider.getCachedMaxClientLagInMs());
ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs());
Assert.assertEquals(
ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT,
parameterProvider.getBufferFlushCheckIntervalInMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import java.util.concurrent.TimeoutException;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.ParameterProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -44,7 +44,7 @@ public void setup() {
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
Properties prop = new Properties();
prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode));
client =
new SnowflakeStreamingIngestClientInternal<>(
"client", null, prop, httpClient, true, requestBuilder, new HashMap<>());
Expand Down
Loading

0 comments on commit b12ec41

Please sign in to comment.