-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1437885 Disable blob interleaving when in Iceberg mode #763
Conversation
@@ -123,6 +123,10 @@ List<List<ChannelData<T>>> getData() { | |||
// blob encoding version | |||
private final Constants.BdecVersion bdecVersion; | |||
|
|||
// Indicates if it's flushing to Iceberg tables, a blob could only contain one chunk under Iceberg | |||
// mode | |||
private final boolean isIcebergMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this have a more general name, e.g. isNonInterleavedMode
or disableInterleavedBlobs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can make this more general for the internal names since we disable interleaved mode due to other reasons in the future, not only Iceberg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the non-interleave parameter for now as we could control this via MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST
} | ||
|
||
/*** Constructor for TEST ONLY | ||
* | ||
* @param name the name of the client | ||
*/ | ||
SnowflakeStreamingIngestClientInternal(String name) { | ||
this(name, null, null, null, true, null, new HashMap<>()); | ||
this(name, null, null, null, false, true, null, new HashMap<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's about 20 testcases using this ctor, I'd want to get iceberg mode test coverage for all those tests too (unless there's a good reason to not need it).
Can you expose isIcebergMode on this test mode ctor's signature too, and parameterize the calling tests to run with both icebergMode = on / off?
I see we're using JUnit which has good support for doing multiple test runs with different parameter values declared on an annotation / on a values provider defined as a test class member, so it'll be a one-time cost to set this up in all the test classes, but in exchange we'll get comprehensive ongoing test coverage.
cc @sfc-gh-tzhang in you case you want to weigh in on this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, you can search for Parameterized
and see how to use that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added parameters for unit tests.
@@ -437,7 +447,7 @@ && shouldStopProcessing( | |||
} | |||
// Add processed channels to the current blob, stop if we need to create a new blob | |||
blobData.add(channelsDataPerTable.subList(0, idx)); | |||
if (idx != channelsDataPerTable.size()) { | |||
if (idx != channelsDataPerTable.size() || isIcebergMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to guard against the possibility of having isIcebergMode peppered across 10 different places in code, making it that much harder to maintain the client.
Ideally we should have one place in the whole client that does if (isIcebergMode) { /* initialize booleans / ints / settings one way */ } else { /* initialize another way */ }
- it seems that the ParameterProvider is the best place to do this, or we introduce a separate ClientSettings class that holds all these settings that control different behaviors in different places in the client.
I also see there are three parameters on the parameter provider that need different values for iceberg - MAX_CHUNK_SIZE_IN_BYTES (set to 512 MB rn) and MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT (set to 100 rn) and MAX_BLOB_SIZE_IN_BYTES (set to 1 GB rn). This is in addition to MAX_CLIENT_LAG needing an iceberg-specific value.
It looks like if parameterProvider.getMaxChunksInBlobAndRegistrationRequest() returns 1 then you'll get this same desired behavior ?
Note - in the parameterProvider you'll need to validate that customers can't override MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST and it can only ever be 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted the isIcebergMode
param and use MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST
for now.
src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple comments, PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there're a few things missing in this PR:
- Block creation of regular channels on Iceberg client and creation of iceberg channels on regular client
- No need to call the client/configure endpoint for Iceberg client
Do you plan to do that in a separate PR?
src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
Outdated
Show resolved
Hide resolved
@@ -123,6 +123,10 @@ List<List<ChannelData<T>>> getData() { | |||
// blob encoding version | |||
private final Constants.BdecVersion bdecVersion; | |||
|
|||
// Indicates if it's flushing to Iceberg tables, a blob could only contain one chunk under Iceberg | |||
// mode | |||
private final boolean isIcebergMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can make this more general for the internal names since we disable interleaved mode due to other reasons in the future, not only Iceberg
} | ||
|
||
/*** Constructor for TEST ONLY | ||
* | ||
* @param name the name of the client | ||
*/ | ||
SnowflakeStreamingIngestClientInternal(String name) { | ||
this(name, null, null, null, true, null, new HashMap<>()); | ||
this(name, null, null, null, false, true, null, new HashMap<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, you can search for Parameterized
and see how to use that
Thanks for the comments and tips! Added |
// Required parameter override for Iceberg mode | ||
if (this.isIcebergMode) { | ||
this.parameterMap.put( | ||
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not call this.updateValue here ? That's the only method that's updating parameterMap before this block was added, lets keep it that way..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The updateValue
considers props and paramOverrides before default value. While under iceberg mode we should not allow user to set the chunk number > 1, that's why I use the put
method directly. I think we can use updateValue(key, value, null, null)
to achieve the same result, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left one small comment that you can take in the next PR too as you have other changes waiting for this to go in.
src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java
Outdated
Show resolved
Hide resolved
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = 1; | ||
|
||
// If the provided parameters need to be verified and modified to meet Iceberg mode | ||
private final boolean isIcebergMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if customer create a Iceberg client but override this value to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think client can inject ParameterProvider
directly, instantiate is called by ClientInternal
at here.
src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Outdated
Show resolved
Hide resolved
public class FlushServiceTest { | ||
|
||
@Parameterized.Parameters(name = "isIcebergMode: {0}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought what we should be doing is to test the isIcebergMode with {true, false}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is the format of the test name, where the first parameter is either false
or true
from here.
Per offline discussion with @sfc-gh-tzhang , moved |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, PTAL, otherwise LGTM! One request I have is could we merge all the Iceberg related changes to a feature branch instead of main
? We can move everything to main
once we ready, otherwise the new release will contain all the WIP changes and customer might do something with it.
...ain/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Outdated
Show resolved
Hide resolved
* Add parameters & disable interleaving mode
SnowflakeStreamingIngestClientInternal
to activate/deactivate streaming to Iceberg tables.