Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1061848: DirectTopicPartitionChannel - copy of the old implementation with removed buffer #852

Merged

Conversation

sfc-gh-akowalczyk
Copy link
Contributor

@sfc-gh-akowalczyk sfc-gh-akowalczyk commented May 29, 2024

Overview

SNOW-1061848

This is my second attempt to remove the buffering from a quite tightly convoluted piece of code in the BufferredTopicPartitionChannel class.

To reduce the extent of changes, and make it easily comparable to the old implementation, the PR is prepared in 3 steps (1 step is 1 commit).

  1. DirectTopicPartionChannel is just a clone of BufferrredTopicPartitionChannel - you can skip browsing this commit.
  2. My attempt to delete all buffers-related classes and make the code work again. Any doubts were solved by adapting Revi changes from the PR Rcheng doublebuff2 #735
  3. Fix of unit tests to make them green

The current implementation is validated with:

  • TopicPartitionChannelTest - all green
  • some of TopicPartitionChannelIT -however it still requires manual change in the code to follow a single buffer path.

This change is safe to merge as we still do not allow for using a single buffer. As the next PRs I'd like to gradually make other tests passing.

Pre-review checklist

  • This change should be part of a Behavior Change Release. See go/behavior-change.
  • This change has passed Merge gate tests
  • Snowpipe Changes
  • Snowpipe Streaming Changes
  • This change is TEST-ONLY
  • This change is README/Javadocs only
  • This change is protected by a config parameter <PARAMETER_NAME> eg snowflake.ingestion.method.
    • Yes - Added end to end and Unit Tests.
    • No - Suggest why it is not param protected
  • Is his change protected by parameter <PARAMETER_NAME> on the server side?
    • The parameter/feature is not yet active in production (partial rollout or PrPr, see Changes for Unreleased Features and Fixes).
    • If there is an issue, it can be safely mitigated by turning the parameter off. This is also verified by a test (See go/ppp).

@sfc-gh-akowalczyk sfc-gh-akowalczyk requested a review from a team as a code owner May 29, 2024 11:10
@sfc-gh-akowalczyk sfc-gh-akowalczyk force-pushed the akowalczyk/snow-1061848-direct-topic-channel-body branch from 2615388 to a4ce755 Compare May 29, 2024 11:12
* @return InsertValidationResponse a response that wraps around InsertValidationResponse
*/
private InsertValidationResponse insertRowsWithFallback(Map<String, Object> transformedRecord, long offset) {
Fallback<Object> reopenChannelFallbackExecutorForInsertRows =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know this code was here initially, but it does obscure the view a lot. what do you think - could we perhaps extract the failsafe handling outside, to some other class, make it sort of an aspect applied on top of the channel, so from channel perspective- we simply insert rows?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while this code is functional, its not readable or easily debuggable. Probably out of scope for this pr, but yes i support either splitting the fallback logic into a different class or using a different implementation

Copy link
Contributor

@sfc-gh-gjachimko sfc-gh-gjachimko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it does the job, however, the amount of state managed by the channel is huge.
shall we consider refactoring it into smaller pieces?
i think for scenarios like that, building something like a flux stream could significantly reduce the complexity.. but that's something we could consider later.

Copy link
Contributor

@sfc-gh-lshcharbaty sfc-gh-lshcharbaty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job!
Apply ./format.sh, please.

*/
private InsertRowsResponse insertRowsWithFallback(StreamingBuffer buffer) {
private InsertValidationResponse insertRowsWithFallback(Map<String, Object> transformedRecord, long offset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: insertRowWithFallback.

Copy link
Collaborator

@sfc-gh-rcheng sfc-gh-rcheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks for the change, i like how you moved the schematization logic. left comments about logging and other small changes

return;
}
// Accept the incoming record only if we don't have a valid offset token at server side, or the
// incoming record offset is 1 + the processed offset
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace 1+ with greater in the comment here to match behavior

* @return InsertValidationResponse a response that wraps around InsertValidationResponse
*/
private InsertValidationResponse insertRowsWithFallback(Map<String, Object> transformedRecord, long offset) {
Fallback<Object> reopenChannelFallbackExecutorForInsertRows =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while this code is functional, its not readable or easily debuggable. Probably out of scope for this pr, but yes i support either splitting the fallback logic into a different class or using a different implementation

}

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> input() {
return Arrays.asList(new Object[][] {{true}, {false}});
return Arrays.asList(new Object[][] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets name the variables similar to this test for better readability in intellij

* being invoked)
* @return Approximate long size of record in bytes. 0 if record is broken
*/
protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice were removing this! we actually make this approximation twice per record, so hopefully this will speed up ingestion to not make this pass through

@sfc-gh-akowalczyk sfc-gh-akowalczyk requested a review from a team as a code owner June 3, 2024 13:04
@sfc-gh-akowalczyk sfc-gh-akowalczyk force-pushed the akowalczyk/snow-1061848-direct-topic-channel-body branch from cc2b6f9 to 0ae0d55 Compare June 3, 2024 13:25
@sfc-gh-akowalczyk sfc-gh-akowalczyk merged commit 951f39f into master Jun 3, 2024
78 of 81 checks passed
@sfc-gh-akowalczyk sfc-gh-akowalczyk deleted the akowalczyk/snow-1061848-direct-topic-channel-body branch June 3, 2024 14:29
ConfluentSemaphore pushed a commit to confluentinc/snowflake-kafka-connector that referenced this pull request Aug 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants