Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1437885 Disable blob interleaving when in Iceberg mode #763

Merged
merged 6 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public static class Builder {
// Allows client to override some default parameter values
private Map<String, Object> parameterOverrides;

// Indicates whether it's streaming to Iceberg tables. Open channels on regular tables should
// fail in this mode.
private boolean isIcebergMode;
sfc-gh-tzhang marked this conversation as resolved.
Show resolved Hide resolved

// Indicates whether it's under test mode
private boolean isTestMode;

Expand All @@ -45,6 +49,11 @@ public Builder setParameterOverrides(Map<String, Object> parameterOverrides) {
return this;
}

public Builder setIcebergMode(boolean isIcebergMode) {
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
this.isIcebergMode = isIcebergMode;
return this;
}

public Builder setIsTestMode(boolean isTestMode) {
this.isTestMode = isTestMode;
return this;
Expand All @@ -58,7 +67,12 @@ public SnowflakeStreamingIngestClient build() {
SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL));

return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, this.isTestMode);
this.name,
accountURL,
prop,
this.parameterOverrides,
this.isIcebergMode,
this.isTestMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
// Indicates whether the client has closed
private volatile boolean isClosed;

// Indicates wheter the client is streaming to Iceberg tables
private final boolean isIcebergMode;

// Indicates whether the client is under test mode
private final boolean isTestMode;

Expand Down Expand Up @@ -152,6 +155,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param accountURL Snowflake account url
* @param prop connection properties
* @param httpClient http client for sending request
* @param isIcebergMode whether we're streaming to iceberg tables
* @param isTestMode whether we're under test mode
* @param requestBuilder http request builder
* @param parameterOverrides parameters we override in case we want to set different values
Expand All @@ -161,13 +165,15 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
SnowflakeURL accountURL,
Properties prop,
CloseableHttpClient httpClient,
boolean isIcebergMode,
boolean isTestMode,
RequestBuilder requestBuilder,
Map<String, Object> parameterOverrides) {
this.parameterProvider = new ParameterProvider(parameterOverrides, prop);
this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode);

this.name = name;
String accountName = accountURL == null ? null : accountURL.getAccount();
this.isIcebergMode = isIcebergMode;
this.isTestMode = isTestMode;
this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient;
this.channelCache = new ChannelCache<>();
Expand Down Expand Up @@ -258,16 +264,17 @@ public SnowflakeStreamingIngestClientInternal(
SnowflakeURL accountURL,
Properties prop,
Map<String, Object> parameterOverrides,
boolean isIcebergMode,
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
boolean isTestMode) {
this(name, accountURL, prop, null, isTestMode, null, parameterOverrides);
this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides);
}

/*** Constructor for TEST ONLY
*
* @param name the name of the client
*/
SnowflakeStreamingIngestClientInternal(String name) {
this(name, null, null, null, true, null, new HashMap<>());
SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) {
this(name, null, null, null, isIcebergMode, true, null, new HashMap<>());
}

// TESTING ONLY - inject the request builder
Expand Down
25 changes: 22 additions & 3 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public class ParameterProvider {
public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

// Iceberg mode default parameters
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = 1;
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved

// If the provided parameters need to be verified and modified to meet Iceberg mode
private final boolean isIcebergMode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if customer create a Iceberg client but override this value to false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think client can inject ParameterProvider directly, instantiate is called by ClientInternal at here.


/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;
Expand All @@ -80,14 +86,18 @@ public class ParameterProvider {
*
* @param parameterOverrides Map of parameter name to value
* @param props Properties from profile file
* @param isIcebergMode If the provided parameters need to be verified and modified to meet
* Iceberg mode
*/
public ParameterProvider(Map<String, Object> parameterOverrides, Properties props) {
public ParameterProvider(
Map<String, Object> parameterOverrides, Properties props, boolean isIcebergMode) {
this.isIcebergMode = isIcebergMode;
this.setParameterMap(parameterOverrides, props);
}

/** Empty constructor for tests */
public ParameterProvider() {
this(null, null);
public ParameterProvider(boolean isIcebergMode) {
this(null, null, isIcebergMode);
}

private void updateValue(
Expand All @@ -99,6 +109,8 @@ private void updateValue(
this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue));
} else if (props != null) {
this.parameterMap.put(key, props.getOrDefault(key, defaultValue));
} else {
this.parameterMap.put(key, defaultValue);
}
}

Expand Down Expand Up @@ -188,6 +200,13 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props);

// Required parameter override for Iceberg mode
if (this.isIcebergMode) {
this.parameterMap.put(
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not call this.updateValue here ? That's the only method that's updating parameterMap before this block was added, lets keep it that way..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updateValue considers props and paramOverrides before default value. While under iceberg mode we should not allow user to set the chunk number > 1, that's why I use the put method directly. I think we can use updateValue(key, value, null, null) to achieve the same result, wdyt?

MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT);
}
}

/** @return Longest interval in milliseconds between buffer flushes */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ChannelCacheTest {
@Before
public void setup() {
cache = new ChannelCache<>();
client = new SnowflakeStreamingIngestClientInternal<>("client");
client = new SnowflakeStreamingIngestClientInternal<>("client", false);
channel1 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,22 @@
import net.snowflake.ingest.utils.SFException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
public class FlushServiceTest {

@Parameterized.Parameters(name = "isIcebergMode: {0}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought what we should be doing is to test the isIcebergMode with {true, false}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is the format of the test name, where the first parameter is either false or true from here.

public static Object[] isIcebergMode() {
return new Object[] {false, true};
}

@Parameterized.Parameter public static boolean isIcebergMode;

public FlushServiceTest() {
this.testContextFactory = ParquetTestContext.createFactory();
}
Expand Down Expand Up @@ -86,7 +97,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
TestContext() {
stage = Mockito.mock(StreamingIngestStage.class);
Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix");
parameterProvider = new ParameterProvider();
parameterProvider = new ParameterProvider(isIcebergMode);
client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
channelCache = new ChannelCache<>();
Expand Down Expand Up @@ -586,7 +597,10 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti
Math.ceil(
(double) numberOfRows
/ channelsPerTable
/ ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT);
/ (isIcebergMode
? ParameterProvider
.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT
: ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT));

final TestContext<List<List<Object>>> testContext = testContextFactory.create();

Expand Down Expand Up @@ -861,7 +875,7 @@ public void testInvalidateChannels() {
// Create a new Client in order to not interfere with other tests
SnowflakeStreamingIngestClientInternal<StubChunkData> client =
Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
ParameterProvider parameterProvider = new ParameterProvider();
ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode);
ChannelCache<StubChunkData> channelCache = new ChannelCache<>();
Mockito.when(client.getChannelCache()).thenReturn(channelCache);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testCreateOAuthClient() throws Exception {
@Test
public void testSetRefreshToken() throws Exception {
SnowflakeStreamingIngestClientInternal<StubChunkData> client =
new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT");
new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false);
MockOAuthClient mockOAuthClient = new MockOAuthClient();

OAuthManager oAuthManager =
Expand Down
Loading
Loading