-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-967235 Sends start and end offsets from the Client SDK #675
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this JIRA! I believe your logic needs to based on my PR which will allow customer to input the start offset of a batch, otherwise the offset we send to server side will never be continuous. And glad we're thinking about doing the same thing :)
src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
Show resolved
Hide resolved
/** | ||
* Updates the channel's offset token. | ||
* | ||
* @param offsetToken new offset token | ||
*/ | ||
void setOffsetToken(String offsetToken) { | ||
if (!startOffsetTokenSet) { | ||
startOffsetToken = offsetToken; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we're setting the startOffsetToken to offsetToken, offsetToken is the last offset of the first batch, so basically we're sending (last offset of first batch, last offset of the chunk) which won't be continuous from the previous chunk, also the row count won't be matched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we get offset token for a batch, no? We copy the data here from ChannelRuntimeState
for a RowBuffer
into a ChannelDTO
or something further down the line. So with startOffsetTokenSet
we essentially grab the FIRST offset token provided in a call to insertRows
for a batch and store that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you see how ChannelRuntimeState is being constructed, it's only being constructed once in the SnowflakeStreamingIngestChannelInternal and never being reseted between flushes. You can run a test to verify, but I think this means the startOffsetToken will always be the same for all chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the logic in ChannelRuntimeState
already to store the correct start and end offset, so this is not needed anymore
@sfc-gh-tjones #676 is merged and should unblock your PR |
ad1414a
to
97e185f
Compare
This sends the start and end offset tokens for a channel when sending a blob to Snowflake. Right now only the end offset is sent which makes reasoning about ranges difficult. @test verified via existing tests in `RowBufferTest` and `SnowflakeStreamingIngestChannelTest`
97e185f
to
88a250e
Compare
@@ -17,6 +17,7 @@ class ChannelMetadata { | |||
private final Long clientSequencer; | |||
private final Long rowSequencer; | |||
@Nullable private final String offsetToken; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable private final String offsetToken; | |
@Nullable private final String endOffsetToken; |
this is basically an internal field name, does it make sense to rename it as well for better readability?
This sends the start and end offset tokens for a channel when sending a blob to Snowflake. Right now only the end offset is sent which makes reasoning about ranges difficult.
@test verified via existing tests in
RowBufferTest
andSnowflakeStreamingIngestChannelTest