-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-979849: Support user to pass a user defined offset token verification logic as part of channel creation #680
Conversation
…-ingest-java into tzhang-si-overflow
src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
Outdated
Show resolved
Hide resolved
@@ -66,6 +69,8 @@ public String toString() { | |||
TelemetryService(CloseableHttpClient httpClient, String clientName, String url) { | |||
this.clientName = clientName; | |||
this.telemetry = (TelemetryClient) TelemetryClient.createSessionlessTelemetry(httpClient, url); | |||
// At most once every seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// At most once every seconds | |
// At most once every second |
package net.snowflake.ingest.streaming; | ||
|
||
/** | ||
* Interface to create a user defined offset token verification function that is used to verify |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Interface to create a user defined offset token verification function that is used to verify | |
* Interface to provide a custom offset verification logic. If specified, verification failures will be logged as warnings and reported to Snowflake. |
* Interface to create a user defined offset token verification function that is used to verify | ||
* offset behaviors, if the verification failed, we will log a warning and report it to SF | ||
* | ||
* <p>Below is an example that verifies all offset tokens need to monotonically increment numbers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* <p>Below is an example that verifies all offset tokens need to monotonically increment numbers: | |
* <p>Below is an example that verifies that all offset tokens are monotonically increasing numbers: |
* } | ||
* } | ||
* } catch (NumberFormatException ignored) { | ||
* // Do nothing since we can't compare the offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: One should probably report a mismatch if offset is not of the right format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually allow offset token to be any arbitrary String
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get that. I was only referring to this specific example where the user knows that it should be a parseable as a number.
@@ -669,6 +694,30 @@ private void checkBatchSizeRecommendedMaximum(float batchSizeInBytes) { | |||
} | |||
} | |||
|
|||
/** | |||
* We verify some offset expect behaviors based on the provided verification logic and report to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* We verify some offset expect behaviors based on the provided verification logic and report to | |
* We verify the offset based on the provided verification logic and report to |
*/ | ||
private void checkOffsetMismatch( | ||
String prevEndOffset, String curStartOffset, String curEndOffset, int rowCount) { | ||
if (telemetryService != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should still do the verification even if there is no telemetry service. You can check the service before using it. Some users may want the warning even if they don't want anything sent to Snowflake.
|
||
package net.snowflake.ingest.streaming; | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should provide some guidance around when this should be used and what to do if users notice mismatches etc. Okay to point to Snowflake docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a sentence, PTAL, I don't think we have any documentation on this.
…-ingest-java into tzhang-si-overflow
@@ -78,6 +79,35 @@ public static Object[] compressionAlgorithms() { | |||
|
|||
@Parameter public String compressionAlgorithm; | |||
|
|||
private static final OffsetTokenVerificationFunction offsetTokenVerificationFunction = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we verify the behaviour of the this verification function (since it's the one we recommend to users) by test cases that have/don't have a mismatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a small test
* offset behaviors, if the verification failed, we will log a warning and report it to SF | ||
* Interface to provide a custom offset verification logic. If specified, verification failures will | ||
* be logged as warnings and reported to Snowflake. This interface could be used when there are | ||
* certain assumption about the offset token behavior and please reach out to Snowflake if you |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* certain assumption about the offset token behavior and please reach out to Snowflake if you | |
* certain assumption about the offset token behavior. Please reach out to Snowflake if you |
@@ -66,6 +69,8 @@ public String toString() { | |||
TelemetryService(CloseableHttpClient httpClient, String clientName, String url) { | |||
this.clientName = clientName; | |||
this.telemetry = (TelemetryClient) TelemetryClient.createSessionlessTelemetry(httpClient, url); | |||
// At most once every second | |||
this.rateLimiter = RateLimiter.create(1.0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isnt this too low? essentially it is one per second across all channels IIUC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, what about we create one RateLimiter per channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah.. one per channel is okay!
src/main/java/net/snowflake/ingest/streaming/OffsetTokenVerificationFunction.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
left a suggestion on ratelimiter.
With this PR, user could pass in a user defined offset token verification function that is used to verify offset behaviors, if the verification failed, we will log an error and report it to SF