Skip to content

Commit

Permalink
SNOW-1437885 Disable blob interleaving when in Iceberg mode (#763)
Browse files Browse the repository at this point in the history
* Add parameters & disable interleaving mode
  • Loading branch information
sfc-gh-alhuang committed Jul 10, 2024
1 parent f5669d1 commit 2b26107
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public static class Builder {
// Allows client to override some default parameter values
private Map<String, Object> parameterOverrides;

// Indicates whether it's streaming to Iceberg tables. Open channels on regular tables should
// fail in this mode.
private boolean isIcebergMode;

// Indicates whether it's under test mode
private boolean isTestMode;

Expand All @@ -45,6 +49,11 @@ public Builder setParameterOverrides(Map<String, Object> parameterOverrides) {
return this;
}

public Builder setIsIceberg(boolean isIcebergMode) {
this.isIcebergMode = isIcebergMode;
return this;
}

public Builder setIsTestMode(boolean isTestMode) {
this.isTestMode = isTestMode;
return this;
Expand All @@ -58,7 +67,12 @@ public SnowflakeStreamingIngestClient build() {
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));

return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, this.isTestMode);
this.name,
accountURL,
prop,
this.parameterOverrides,
this.isIcebergMode,
this.isTestMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
// Indicates whether the client has closed
private volatile boolean isClosed;

// Indicates wheter the client is streaming to Iceberg tables
private final boolean isIcebergMode;

// Indicates whether the client is under test mode
private final boolean isTestMode;

Expand Down Expand Up @@ -152,6 +155,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param accountURL Snowflake account url
* @param prop connection properties
* @param httpClient http client for sending request
* @param isIcebergMode whether we're streaming to iceberg tables
* @param isTestMode whether we're under test mode
* @param requestBuilder http request builder
* @param parameterOverrides parameters we override in case we want to set different values
Expand All @@ -161,13 +165,15 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
SnowflakeURL accountURL,
Properties prop,
CloseableHttpClient httpClient,
boolean isIcebergMode,
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides) {
this.parameterProvider = new ParameterProvider(parameterOverrides, prop);
this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode);

this.name = name;
String accountName = accountURL == null ? null : accountURL.getAccount();
this.isIcebergMode = isIcebergMode;
this.isTestMode = isTestMode;
this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient;
this.channelCache = new ChannelCache<>();
Expand Down Expand Up @@ -251,23 +257,25 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param accountURL Snowflake account url
* @param prop connection properties
* @param parameterOverrides map of parameters to override for this client
* @param isIcebergMode whether we're streaming to iceberg tables
* @param isTestMode indicates whether it's under test mode
*/
public SnowflakeStreamingIngestClientInternal(
String name,
SnowflakeURL accountURL,
Properties prop,
Map<String, Object> parameterOverrides,
boolean isIcebergMode,
boolean isTestMode) {
this(name, accountURL, prop, null, isTestMode, null, parameterOverrides);
this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides);
}

/*** Constructor for TEST ONLY
*
* @param name the name of the client
*/
SnowflakeStreamingIngestClientInternal(String name) {
this(name, null, null, null, true, null, new HashMap<>());
SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) {
this(name, null, null, null, isIcebergMode, true, null, new HashMap<>());
}

// TESTING ONLY - inject the request builder
Expand Down
45 changes: 39 additions & 6 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package net.snowflake.ingest.utils;

import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -64,6 +66,10 @@ public class ParameterProvider {
public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

/* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT =
1; // 1 parquet file per blob

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;
Expand All @@ -80,14 +86,17 @@ public class ParameterProvider {
*
* @param parameterOverrides Map of parameter name to value
* @param props Properties from profile file
* @param isIcebergMode If the provided parameters need to be verified and modified to meet
* Iceberg mode
*/
public ParameterProvider(Map<String, Object> parameterOverrides, Properties props) {
this.setParameterMap(parameterOverrides, props);
public ParameterProvider(
Map<String, Object> parameterOverrides, Properties props, boolean isIcebergMode) {
this.setParameterMap(parameterOverrides, props, isIcebergMode);
}

/** Empty constructor for tests */
public ParameterProvider() {
this(null, null);
public ParameterProvider(boolean isIcebergMode) {
this(null, null, isIcebergMode);
}

private void updateValue(
Expand All @@ -99,6 +108,8 @@ private void updateValue(
this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue));
} else if (props != null) {
this.parameterMap.put(key, props.getOrDefault(key, defaultValue));
} else {
this.parameterMap.put(key, defaultValue);
}
}

Expand All @@ -107,8 +118,11 @@ private void updateValue(
*
* @param parameterOverrides Map<String, Object> of parameter name -> value
* @param props Properties file provided to client constructor
* @param isIcebergMode If the provided parameters need to be verified and modified to meet
* Iceberg mode
*/
private void setParameterMap(Map<String, Object> parameterOverrides, Properties props) {
private void setParameterMap(
Map<String, Object> parameterOverrides, Properties props, boolean isIcebergMode) {
// BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed
if ((parameterOverrides != null
&& parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS))
Expand Down Expand Up @@ -179,7 +193,9 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties

this.updateValue(
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST,
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT,
isIcebergMode
? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT
: MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT,
parameterOverrides,
props);

Expand All @@ -188,6 +204,13 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props);

// Required parameter override for Iceberg mode
if (isIcebergMode) {
icebergModeValidation(
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST,
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT);
}
}

/** @return Longest interval in milliseconds between buffer flushes */
Expand Down Expand Up @@ -411,4 +434,14 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
}

private void icebergModeValidation(String key, Object expected) {
Object val = this.parameterMap.get(key);
if (!val.equals(expected)) {
throw new SFException(
INVALID_CONFIG_PARAMETER,
String.format(
"The value %s for %s is invalid in Iceberg mode, should be %s.", val, key, expected));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ChannelCacheTest {
@Before
public void setup() {
cache = new ChannelCache<>();
client = new SnowflakeStreamingIngestClientInternal<>("client");
client = new SnowflakeStreamingIngestClientInternal<>("client", false);
channel1 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER;
import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT;
import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT;

import com.codahale.metrics.Histogram;
Expand Down Expand Up @@ -48,11 +49,22 @@
import net.snowflake.ingest.utils.SFException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
public class FlushServiceTest {

@Parameterized.Parameters(name = "isIcebergMode: {0}")
public static Object[] isIcebergMode() {
return new Object[] {false, true};
}

@Parameterized.Parameter public static boolean isIcebergMode;

public FlushServiceTest() {
this.testContextFactory = ParquetTestContext.createFactory();
}
Expand Down Expand Up @@ -86,7 +98,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
TestContext() {
stage = Mockito.mock(StreamingIngestStage.class);
Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
parameterProvider = new ParameterProvider();
parameterProvider = new ParameterProvider(isIcebergMode);
client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
channelCache = new ChannelCache<>();
Expand Down Expand Up @@ -586,7 +598,9 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti
Math.ceil(
(double) numberOfRows
/ channelsPerTable
/ ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT);
/ (isIcebergMode
? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT
: ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT));

final TestContext<List<List<Object>>> testContext = testContextFactory.create();

Expand Down Expand Up @@ -861,7 +875,7 @@ public void testInvalidateChannels() {
// Create a new Client in order to not interfere with other tests
SnowflakeStreamingIngestClientInternal<StubChunkData> client =
Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
ParameterProvider parameterProvider = new ParameterProvider();
ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode);
ChannelCache<StubChunkData> channelCache = new ChannelCache<>();
Mockito.when(client.getChannelCache()).thenReturn(channelCache);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testCreateOAuthClient() throws Exception {
@Test
public void testSetRefreshToken() throws Exception {
SnowflakeStreamingIngestClientInternal<StubChunkData> client =
new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT");
new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false);
MockOAuthClient mockOAuthClient = new MockOAuthClient();

OAuthManager oAuthManager =
Expand Down
Loading

0 comments on commit 2b26107

Please sign in to comment.