-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding the context to the source connectors #388
Adding the context to the source connectors #388
Conversation
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java
Outdated
Show resolved
Hide resolved
@Override | ||
public void configureDistributionStrategy(final int maxTasks) { | ||
this.maxTasks = maxTasks; | ||
return Math.floorMod(ctx.getStorageKey().hashCode(), maxTasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be too slow a calculation but we can leave it for now. There is a much faster mod calculation in commons collections, but we would have to extract it.
...s/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
772c511
to
e8452bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need a test case for DistributionStrategy that shows negative hash values generate do not generate negative tasks or throw exceptions.
commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java
Outdated
Show resolved
Hide resolved
@@ -27,18 +28,34 @@ | |||
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer | |||
* workers than tasks, and they will be assigned the remaining tasks as work completes. | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still a true statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I believe so, i reviewed it before and I do think it is a true statement as every object in the s3 case will only ever be able to be processed by the same task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends upon the strategy implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reworded it
...ons/src/test/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtilsTest.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3Key.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
b8e2b2a
to
9525803
Compare
Signed-off-by: Aindriu Lavelle <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
Signed-off-by: Aindriu Lavelle <[email protected]>
commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
Outdated
Show resolved
Hide resolved
b226d52
to
f4a470b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good.
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/task/Context.java
Outdated
Show resolved
Hide resolved
@@ -27,18 +28,34 @@ | |||
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer | |||
* workers than tasks, and they will be assigned the remaining tasks as work completes. | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends upon the strategy implemented.
commons/src/main/java/io/aiven/kafka/connect/common/config/CommonConfig.java
Show resolved
Hide resolved
...connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
5cfecf7
to
aa0ca1e
Compare
Updating and refactoring the distribution strategies based on feedback.